Skip to main content

mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59    CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60    CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61    CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141    scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154    AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
155    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
156    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
157    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
158    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
159    CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
160    CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
161    CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
162    MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
163    PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
164    Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
165    WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
166    transform_ast,
167};
168use crate::session::vars::{
169    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
170    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
171    ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
172};
173use crate::{names, parse};
174
175mod connection;
176
177// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
178//
179// The real max is probably higher than this, but it's easier to relax a constraint than make it
180// more strict.
181const MAX_NUM_COLUMNS: usize = 256;
182
183static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
184    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
185
186/// Given a relation desc and a column list, checks that:
187/// - the column list is a prefix of the desc;
188/// - all the listed columns are types that have meaningful Persist-level ordering.
189fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
190    if partition_by.len() > desc.len() {
191        tracing::error!(
192            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
193            desc.len(),
194            partition_by.len()
195        );
196        partition_by.truncate(desc.len());
197    }
198
199    let desc_prefix = desc.iter().take(partition_by.len());
200    for (idx, ((desc_name, desc_type), partition_name)) in
201        desc_prefix.zip_eq(partition_by).enumerate()
202    {
203        let partition_name = normalize::column_name(partition_name);
204        if *desc_name != partition_name {
205            sql_bail!(
206                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
207            );
208        }
209        if !preserves_order(&desc_type.scalar_type) {
210            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
211        }
212    }
213    Ok(())
214}
215
216pub fn describe_create_database(
217    _: &StatementContext,
218    _: CreateDatabaseStatement,
219) -> Result<StatementDesc, PlanError> {
220    Ok(StatementDesc::new(None))
221}
222
223pub fn plan_create_database(
224    _: &StatementContext,
225    CreateDatabaseStatement {
226        name,
227        if_not_exists,
228    }: CreateDatabaseStatement,
229) -> Result<Plan, PlanError> {
230    Ok(Plan::CreateDatabase(CreateDatabasePlan {
231        name: normalize::ident(name.0),
232        if_not_exists,
233    }))
234}
235
236pub fn describe_create_schema(
237    _: &StatementContext,
238    _: CreateSchemaStatement,
239) -> Result<StatementDesc, PlanError> {
240    Ok(StatementDesc::new(None))
241}
242
243pub fn plan_create_schema(
244    scx: &StatementContext,
245    CreateSchemaStatement {
246        mut name,
247        if_not_exists,
248    }: CreateSchemaStatement,
249) -> Result<Plan, PlanError> {
250    if name.0.len() > 2 {
251        sql_bail!("schema name {} has more than two components", name);
252    }
253    let schema_name = normalize::ident(
254        name.0
255            .pop()
256            .expect("names always have at least one component"),
257    );
258    let database_spec = match name.0.pop() {
259        None => match scx.catalog.active_database() {
260            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
261            None => sql_bail!("no database specified and no active database"),
262        },
263        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
264            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
265            Err(_) => sql_bail!("invalid database {}", n.as_str()),
266        },
267    };
268    Ok(Plan::CreateSchema(CreateSchemaPlan {
269        database_spec,
270        schema_name,
271        if_not_exists,
272    }))
273}
274
275pub fn describe_create_table(
276    _: &StatementContext,
277    _: CreateTableStatement<Aug>,
278) -> Result<StatementDesc, PlanError> {
279    Ok(StatementDesc::new(None))
280}
281
282pub fn plan_create_table(
283    scx: &StatementContext,
284    stmt: CreateTableStatement<Aug>,
285) -> Result<Plan, PlanError> {
286    let CreateTableStatement {
287        name,
288        columns,
289        constraints,
290        if_not_exists,
291        temporary,
292        with_options,
293    } = &stmt;
294
295    let names: Vec<_> = columns
296        .iter()
297        .filter(|c| {
298            // This set of `names` is used to create the initial RelationDesc.
299            // Columns that have been added at later versions of the table will
300            // get added further below.
301            let is_versioned = c
302                .options
303                .iter()
304                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
305            !is_versioned
306        })
307        .map(|c| normalize::column_name(c.name.clone()))
308        .collect();
309
310    if let Some(dup) = names.iter().duplicates().next() {
311        sql_bail!("column {} specified more than once", dup.quoted());
312    }
313
314    // Build initial relation type that handles declared data types
315    // and NOT NULL constraints.
316    let mut column_types = Vec::with_capacity(columns.len());
317    let mut defaults = Vec::with_capacity(columns.len());
318    let mut changes = BTreeMap::new();
319    let mut keys = Vec::new();
320
321    for (i, c) in columns.into_iter().enumerate() {
322        let aug_data_type = &c.data_type;
323        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
324        let mut nullable = true;
325        let mut default = Expr::null();
326        let mut versioned = false;
327        for option in &c.options {
328            match &option.option {
329                ColumnOption::NotNull => nullable = false,
330                ColumnOption::Default(expr) => {
331                    // Ensure expression can be planned and yields the correct
332                    // type.
333                    let mut expr = expr.clone();
334                    transform_ast::transform(scx, &mut expr)?;
335                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
336                    default = expr.clone();
337                }
338                ColumnOption::Unique { is_primary } => {
339                    keys.push(vec![i]);
340                    if *is_primary {
341                        nullable = false;
342                    }
343                }
344                ColumnOption::Versioned { action, version } => {
345                    let version = RelationVersion::from(*version);
346                    versioned = true;
347
348                    let name = normalize::column_name(c.name.clone());
349                    let typ = ty.clone().nullable(nullable);
350
351                    changes.insert(version, (action.clone(), name, typ));
352                }
353                other => {
354                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
355                }
356            }
357        }
358        // TODO(alter_table): This assumes all versioned columns are at the
359        // end. This will no longer be true when we support dropping columns.
360        if !versioned {
361            column_types.push(ty.nullable(nullable));
362        }
363        defaults.push(default);
364    }
365
366    let mut seen_primary = false;
367    'c: for constraint in constraints {
368        match constraint {
369            TableConstraint::Unique {
370                name: _,
371                columns,
372                is_primary,
373                nulls_not_distinct,
374            } => {
375                if seen_primary && *is_primary {
376                    sql_bail!(
377                        "multiple primary keys for table {} are not allowed",
378                        name.to_ast_string_stable()
379                    );
380                }
381                seen_primary = *is_primary || seen_primary;
382
383                let mut key = vec![];
384                for column in columns {
385                    let column = normalize::column_name(column.clone());
386                    match names.iter().position(|name| *name == column) {
387                        None => sql_bail!("unknown column in constraint: {}", column),
388                        Some(i) => {
389                            let nullable = &mut column_types[i].nullable;
390                            if *is_primary {
391                                if *nulls_not_distinct {
392                                    sql_bail!(
393                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
394                                    );
395                                }
396
397                                *nullable = false;
398                            } else if !(*nulls_not_distinct || !*nullable) {
399                                // Non-primary key unique constraints are only keys if all of their
400                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
401                                break 'c;
402                            }
403
404                            key.push(i);
405                        }
406                    }
407                }
408
409                if *is_primary {
410                    keys.insert(0, key);
411                } else {
412                    keys.push(key);
413                }
414            }
415            TableConstraint::ForeignKey { .. } => {
416                // Foreign key constraints are not presently enforced. We allow
417                // them with feature flags for sqllogictest's sake.
418                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
419            }
420            TableConstraint::Check { .. } => {
421                // Check constraints are not presently enforced. We allow them
422                // with feature flags for sqllogictest's sake.
423                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
424            }
425        }
426    }
427
428    if !keys.is_empty() {
429        // Unique constraints are not presently enforced. We allow them with feature flags for
430        // sqllogictest's sake.
431        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
432    }
433
434    let typ = SqlRelationType::new(column_types).with_keys(keys);
435
436    let temporary = *temporary;
437    let name = if temporary {
438        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    } else {
440        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
441    };
442
443    // Check for an object in the catalog with this same name
444    let full_name = scx.catalog.resolve_full_name(&name);
445    let partial_name = PartialItemName::from(full_name.clone());
446    // For PostgreSQL compatibility, we need to prevent creating tables when
447    // there is an existing object *or* type of the same name.
448    if let (false, Ok(item)) = (
449        if_not_exists,
450        scx.catalog.resolve_item_or_type(&partial_name),
451    ) {
452        return Err(PlanError::ItemAlreadyExists {
453            name: full_name.to_string(),
454            item_type: item.item_type(),
455        });
456    }
457
458    let desc = RelationDesc::new(typ, names);
459    let mut desc = VersionedRelationDesc::new(desc);
460    for (version, (_action, name, typ)) in changes.into_iter() {
461        let new_version = desc.add_column(name, typ);
462        if version != new_version {
463            return Err(PlanError::InvalidTable {
464                name: full_name.item,
465            });
466        }
467    }
468
469    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
470
471    // Table options should only consider the original columns, since those
472    // were the only ones in scope when the table was created.
473    //
474    // TODO(alter_table): Will need to reconsider this when we support ALTERing
475    // the PARTITION BY columns.
476    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
477    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
478
479    let compaction_window = options.iter().find_map(|o| {
480        #[allow(irrefutable_let_patterns)]
481        if let crate::plan::TableOption::RetainHistory(lcw) = o {
482            Some(lcw.clone())
483        } else {
484            None
485        }
486    });
487
488    let table = Table {
489        create_sql,
490        desc,
491        temporary,
492        compaction_window,
493        data_source: TableDataSource::TableWrites { defaults },
494    };
495    Ok(Plan::CreateTable(CreateTablePlan {
496        name,
497        table,
498        if_not_exists: *if_not_exists,
499    }))
500}
501
502pub fn describe_create_table_from_source(
503    _: &StatementContext,
504    _: CreateTableFromSourceStatement<Aug>,
505) -> Result<StatementDesc, PlanError> {
506    Ok(StatementDesc::new(None))
507}
508
509pub fn describe_create_webhook_source(
510    _: &StatementContext,
511    _: CreateWebhookSourceStatement<Aug>,
512) -> Result<StatementDesc, PlanError> {
513    Ok(StatementDesc::new(None))
514}
515
516pub fn describe_create_source(
517    _: &StatementContext,
518    _: CreateSourceStatement<Aug>,
519) -> Result<StatementDesc, PlanError> {
520    Ok(StatementDesc::new(None))
521}
522
523pub fn describe_create_subsource(
524    _: &StatementContext,
525    _: CreateSubsourceStatement<Aug>,
526) -> Result<StatementDesc, PlanError> {
527    Ok(StatementDesc::new(None))
528}
529
530generate_extracted_config!(
531    CreateSourceOption,
532    (TimestampInterval, Duration),
533    (RetainHistory, OptionalDuration)
534);
535
536generate_extracted_config!(
537    PgConfigOption,
538    (Details, String),
539    (Publication, String),
540    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
541    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
542);
543
544generate_extracted_config!(
545    MySqlConfigOption,
546    (Details, String),
547    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
548    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
549);
550
551generate_extracted_config!(
552    SqlServerConfigOption,
553    (Details, String),
554    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
555    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
556);
557
558pub fn plan_create_webhook_source(
559    scx: &StatementContext,
560    mut stmt: CreateWebhookSourceStatement<Aug>,
561) -> Result<Plan, PlanError> {
562    if stmt.is_table {
563        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
564    }
565
566    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
567    // we plan to normalize when we canonicalize the create statement.
568    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
569    let enable_multi_replica_sources =
570        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
571    if !enable_multi_replica_sources {
572        if in_cluster.replica_ids().len() > 1 {
573            sql_bail!("cannot create webhook source in cluster with more than one replica")
574        }
575    }
576    let create_sql =
577        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
578
579    let CreateWebhookSourceStatement {
580        name,
581        if_not_exists,
582        body_format,
583        include_headers,
584        validate_using,
585        is_table,
586        // We resolved `in_cluster` above, so we want to ignore it here.
587        in_cluster: _,
588    } = stmt;
589
590    let validate_using = validate_using
591        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
592        .transpose()?;
593    if let Some(WebhookValidation { expression, .. }) = &validate_using {
594        // If the validation expression doesn't reference any part of the request, then we should
595        // return an error because it's almost definitely wrong.
596        if !expression.contains_column() {
597            return Err(PlanError::WebhookValidationDoesNotUseColumns);
598        }
599        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
600        // allow calls to `now()` because some webhook providers recommend rejecting requests that
601        // are older than a certain threshold.
602        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
603            return Err(PlanError::WebhookValidationNonDeterministic);
604        }
605    }
606
607    let body_format = match body_format {
608        Format::Bytes => WebhookBodyFormat::Bytes,
609        Format::Json { array } => WebhookBodyFormat::Json { array },
610        Format::Text => WebhookBodyFormat::Text,
611        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
612        ty => {
613            return Err(PlanError::Unsupported {
614                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
615                discussion_no: None,
616            });
617        }
618    };
619
620    let mut column_ty = vec![
621        // Always include the body of the request as the first column.
622        SqlColumnType {
623            scalar_type: SqlScalarType::from(body_format),
624            nullable: false,
625        },
626    ];
627    let mut column_names = vec!["body".to_string()];
628
629    let mut headers = WebhookHeaders::default();
630
631    // Include a `headers` column, possibly filtered.
632    if let Some(filters) = include_headers.column {
633        column_ty.push(SqlColumnType {
634            scalar_type: SqlScalarType::Map {
635                value_type: Box::new(SqlScalarType::String),
636                custom_id: None,
637            },
638            nullable: false,
639        });
640        column_names.push("headers".to_string());
641
642        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
643            filters.into_iter().partition_map(|filter| {
644                if filter.block {
645                    itertools::Either::Right(filter.header_name)
646                } else {
647                    itertools::Either::Left(filter.header_name)
648                }
649            });
650        headers.header_column = Some(WebhookHeaderFilters { allow, block });
651    }
652
653    // Map headers to specific columns.
654    for header in include_headers.mappings {
655        let scalar_type = header
656            .use_bytes
657            .then_some(SqlScalarType::Bytes)
658            .unwrap_or(SqlScalarType::String);
659        column_ty.push(SqlColumnType {
660            scalar_type,
661            nullable: true,
662        });
663        column_names.push(header.column_name.into_string());
664
665        let column_idx = column_ty.len() - 1;
666        // Double check we're consistent with column names.
667        assert_eq!(
668            column_idx,
669            column_names.len() - 1,
670            "header column names and types don't match"
671        );
672        headers
673            .mapped_headers
674            .insert(column_idx, (header.header_name, header.use_bytes));
675    }
676
677    // Validate our columns.
678    let mut unique_check = HashSet::with_capacity(column_names.len());
679    for name in &column_names {
680        if !unique_check.insert(name) {
681            return Err(PlanError::AmbiguousColumn(name.clone().into()));
682        }
683    }
684    if column_names.len() > MAX_NUM_COLUMNS {
685        return Err(PlanError::TooManyColumns {
686            max_num_columns: MAX_NUM_COLUMNS,
687            req_num_columns: column_names.len(),
688        });
689    }
690
691    let typ = SqlRelationType::new(column_ty);
692    let desc = RelationDesc::new(typ, column_names);
693
694    // Check for an object in the catalog with this same name
695    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
696    let full_name = scx.catalog.resolve_full_name(&name);
697    let partial_name = PartialItemName::from(full_name.clone());
698    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
699        return Err(PlanError::ItemAlreadyExists {
700            name: full_name.to_string(),
701            item_type: item.item_type(),
702        });
703    }
704
705    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
706    // such, we always use a default of EpochMilliseconds.
707    let timeline = Timeline::EpochMilliseconds;
708
709    let plan = if is_table {
710        let data_source = DataSourceDesc::Webhook {
711            validate_using,
712            body_format,
713            headers,
714            cluster_id: Some(in_cluster.id()),
715        };
716        let data_source = TableDataSource::DataSource {
717            desc: data_source,
718            timeline,
719        };
720        Plan::CreateTable(CreateTablePlan {
721            name,
722            if_not_exists,
723            table: Table {
724                create_sql,
725                desc: VersionedRelationDesc::new(desc),
726                temporary: false,
727                compaction_window: None,
728                data_source,
729            },
730        })
731    } else {
732        let data_source = DataSourceDesc::Webhook {
733            validate_using,
734            body_format,
735            headers,
736            // Important: The cluster is set at the `Source` level.
737            cluster_id: None,
738        };
739        Plan::CreateSource(CreateSourcePlan {
740            name,
741            source: Source {
742                create_sql,
743                data_source,
744                desc,
745                compaction_window: None,
746            },
747            if_not_exists,
748            timeline,
749            in_cluster: Some(in_cluster.id()),
750        })
751    };
752
753    Ok(plan)
754}
755
756pub fn plan_create_source(
757    scx: &StatementContext,
758    mut stmt: CreateSourceStatement<Aug>,
759) -> Result<Plan, PlanError> {
760    let CreateSourceStatement {
761        name,
762        in_cluster: _,
763        col_names,
764        connection: source_connection,
765        envelope,
766        if_not_exists,
767        format,
768        key_constraint,
769        include_metadata,
770        with_options,
771        external_references: referenced_subsources,
772        progress_subsource,
773    } = &stmt;
774
775    mz_ore::soft_assert_or_log!(
776        referenced_subsources.is_none(),
777        "referenced subsources must be cleared in purification"
778    );
779
780    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
781        && scx.catalog.system_vars().force_source_table_syntax();
782
783    // If the new source table syntax is forced all the options related to the primary
784    // source output should be un-set.
785    if force_source_table_syntax {
786        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
787            Err(PlanError::UseTablesForSources(
788                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
789            ))?;
790        }
791    }
792
793    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
794
795    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
796        && include_metadata
797            .iter()
798            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
799    {
800        // TODO(guswynn): should this be `bail_unsupported!`?
801        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
802    }
803    if !matches!(
804        source_connection,
805        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
806    ) && !include_metadata.is_empty()
807    {
808        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
809    }
810
811    if !include_metadata.is_empty()
812        && !matches!(
813            envelope,
814            ast::SourceEnvelope::Upsert { .. }
815                | ast::SourceEnvelope::None
816                | ast::SourceEnvelope::Debezium
817        )
818    {
819        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
820    }
821
822    let external_connection =
823        plan_generic_source_connection(scx, source_connection, include_metadata)?;
824
825    let CreateSourceOptionExtracted {
826        timestamp_interval,
827        retain_history,
828        seen: _,
829    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
830
831    let metadata_columns_desc = match external_connection {
832        GenericSourceConnection::Kafka(KafkaSourceConnection {
833            ref metadata_columns,
834            ..
835        }) => kafka_metadata_columns_desc(metadata_columns),
836        _ => vec![],
837    };
838
839    // Generate the relation description for the primary export of the source.
840    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
841        scx,
842        &envelope,
843        format,
844        Some(external_connection.default_key_desc()),
845        external_connection.default_value_desc(),
846        include_metadata,
847        metadata_columns_desc,
848        &external_connection,
849    )?;
850    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
851
852    let names: Vec<_> = desc.iter_names().cloned().collect();
853    if let Some(dup) = names.iter().duplicates().next() {
854        sql_bail!("column {} specified more than once", dup.quoted());
855    }
856
857    // Apply user-specified key constraint
858    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
859        // Don't remove this without addressing
860        // https://github.com/MaterializeInc/database-issues/issues/4371.
861        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
862
863        let key_columns = columns
864            .into_iter()
865            .map(normalize::column_name)
866            .collect::<Vec<_>>();
867
868        let mut uniq = BTreeSet::new();
869        for col in key_columns.iter() {
870            if !uniq.insert(col) {
871                sql_bail!("Repeated column name in source key constraint: {}", col);
872            }
873        }
874
875        let key_indices = key_columns
876            .iter()
877            .map(|col| {
878                let name_idx = desc
879                    .get_by_name(col)
880                    .map(|(idx, _type)| idx)
881                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
882                if desc.get_unambiguous_name(name_idx).is_none() {
883                    sql_bail!("Ambiguous column in source key constraint: {}", col);
884                }
885                Ok(name_idx)
886            })
887            .collect::<Result<Vec<_>, _>>()?;
888
889        if !desc.typ().keys.is_empty() {
890            return Err(key_constraint_err(&desc, &key_columns));
891        } else {
892            desc = desc.with_key(key_indices);
893        }
894    }
895
896    let timestamp_interval = match timestamp_interval {
897        Some(duration) => {
898            // Only validate bounds for new statements (pcx is Some), not during
899            // catalog deserialization (pcx is None). Previously persisted sources
900            // may have intervals that no longer fall within the current bounds.
901            if scx.pcx.is_some() {
902                let min = scx.catalog.system_vars().min_timestamp_interval();
903                let max = scx.catalog.system_vars().max_timestamp_interval();
904                if duration < min || duration > max {
905                    return Err(PlanError::InvalidTimestampInterval {
906                        min,
907                        max,
908                        requested: duration,
909                    });
910                }
911            }
912            duration
913        }
914        None => scx.catalog.system_vars().default_timestamp_interval(),
915    };
916
917    let (desc, data_source) = match progress_subsource {
918        Some(name) => {
919            let DeferredItemName::Named(name) = name else {
920                sql_bail!("[internal error] progress subsource must be named during purification");
921            };
922            let ResolvedItemName::Item { id, .. } = name else {
923                sql_bail!("[internal error] invalid target id");
924            };
925
926            let details = match external_connection {
927                GenericSourceConnection::Kafka(ref c) => {
928                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
929                        metadata_columns: c.metadata_columns.clone(),
930                    })
931                }
932                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
933                    LoadGenerator::Auction
934                    | LoadGenerator::Marketing
935                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
936                    LoadGenerator::Counter { .. }
937                    | LoadGenerator::Clock
938                    | LoadGenerator::Datums
939                    | LoadGenerator::KeyValue(_) => {
940                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
941                            output: LoadGeneratorOutput::Default,
942                        })
943                    }
944                },
945                GenericSourceConnection::Postgres(_)
946                | GenericSourceConnection::MySql(_)
947                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
948            };
949
950            let data_source = DataSourceDesc::OldSyntaxIngestion {
951                desc: SourceDesc {
952                    connection: external_connection,
953                    timestamp_interval,
954                },
955                progress_subsource: *id,
956                data_config: SourceExportDataConfig {
957                    encoding,
958                    envelope: envelope.clone(),
959                },
960                details,
961            };
962            (desc, data_source)
963        }
964        None => {
965            let desc = external_connection.timestamp_desc();
966            let data_source = DataSourceDesc::Ingestion(SourceDesc {
967                connection: external_connection,
968                timestamp_interval,
969            });
970            (desc, data_source)
971        }
972    };
973
974    let if_not_exists = *if_not_exists;
975    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
976
977    // Check for an object in the catalog with this same name
978    let full_name = scx.catalog.resolve_full_name(&name);
979    let partial_name = PartialItemName::from(full_name.clone());
980    // For PostgreSQL compatibility, we need to prevent creating sources when
981    // there is an existing object *or* type of the same name.
982    if let (false, Ok(item)) = (
983        if_not_exists,
984        scx.catalog.resolve_item_or_type(&partial_name),
985    ) {
986        return Err(PlanError::ItemAlreadyExists {
987            name: full_name.to_string(),
988            item_type: item.item_type(),
989        });
990    }
991
992    // We will rewrite the cluster if one is not provided, so we must use the
993    // `in_cluster` value we plan to normalize when we canonicalize the create
994    // statement.
995    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
996
997    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
998
999    // Determine a default timeline for the source.
1000    let timeline = match envelope {
1001        SourceEnvelope::CdcV2 => {
1002            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1003        }
1004        _ => Timeline::EpochMilliseconds,
1005    };
1006
1007    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1008
1009    let source = Source {
1010        create_sql,
1011        data_source,
1012        desc,
1013        compaction_window,
1014    };
1015
1016    Ok(Plan::CreateSource(CreateSourcePlan {
1017        name,
1018        source,
1019        if_not_exists,
1020        timeline,
1021        in_cluster: Some(in_cluster.id()),
1022    }))
1023}
1024
1025pub fn plan_generic_source_connection(
1026    scx: &StatementContext<'_>,
1027    source_connection: &CreateSourceConnection<Aug>,
1028    include_metadata: &Vec<SourceIncludeMetadata>,
1029) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1030    Ok(match source_connection {
1031        CreateSourceConnection::Kafka {
1032            connection,
1033            options,
1034        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1035            scx,
1036            connection,
1037            options,
1038            include_metadata,
1039        )?),
1040        CreateSourceConnection::Postgres {
1041            connection,
1042            options,
1043        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1044            scx, connection, options,
1045        )?),
1046        CreateSourceConnection::SqlServer {
1047            connection,
1048            options,
1049        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1050            scx, connection, options,
1051        )?),
1052        CreateSourceConnection::MySql {
1053            connection,
1054            options,
1055        } => {
1056            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1057        }
1058        CreateSourceConnection::LoadGenerator { generator, options } => {
1059            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1060                scx,
1061                generator,
1062                options,
1063                include_metadata,
1064            )?)
1065        }
1066    })
1067}
1068
1069fn plan_load_generator_source_connection(
1070    scx: &StatementContext<'_>,
1071    generator: &ast::LoadGenerator,
1072    options: &Vec<LoadGeneratorOption<Aug>>,
1073    include_metadata: &Vec<SourceIncludeMetadata>,
1074) -> Result<LoadGeneratorSourceConnection, PlanError> {
1075    let load_generator =
1076        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1077    let LoadGeneratorOptionExtracted {
1078        tick_interval,
1079        as_of,
1080        up_to,
1081        ..
1082    } = options.clone().try_into()?;
1083    let tick_micros = match tick_interval {
1084        Some(interval) => Some(interval.as_micros().try_into()?),
1085        None => None,
1086    };
1087    if up_to < as_of {
1088        sql_bail!("UP TO cannot be less than AS OF");
1089    }
1090    Ok(LoadGeneratorSourceConnection {
1091        load_generator,
1092        tick_micros,
1093        as_of,
1094        up_to,
1095    })
1096}
1097
1098fn plan_mysql_source_connection(
1099    scx: &StatementContext<'_>,
1100    connection: &ResolvedItemName,
1101    options: &Vec<MySqlConfigOption<Aug>>,
1102) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1103    let connection_item = scx.get_item_by_resolved_name(connection)?;
1104    match connection_item.connection()? {
1105        Connection::MySql(connection) => connection,
1106        _ => sql_bail!(
1107            "{} is not a MySQL connection",
1108            scx.catalog.resolve_full_name(connection_item.name())
1109        ),
1110    };
1111    let MySqlConfigOptionExtracted {
1112        details,
1113        // text/exclude columns are already part of the source-exports and are only included
1114        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1115        // be removed once we drop support for implicitly created subsources.
1116        text_columns: _,
1117        exclude_columns: _,
1118        seen: _,
1119    } = options.clone().try_into()?;
1120    let details = details
1121        .as_ref()
1122        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1123    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1124    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1125    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1126    Ok(MySqlSourceConnection {
1127        connection: connection_item.id(),
1128        connection_id: connection_item.id(),
1129        details,
1130    })
1131}
1132
1133fn plan_sqlserver_source_connection(
1134    scx: &StatementContext<'_>,
1135    connection: &ResolvedItemName,
1136    options: &Vec<SqlServerConfigOption<Aug>>,
1137) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1138    let connection_item = scx.get_item_by_resolved_name(connection)?;
1139    match connection_item.connection()? {
1140        Connection::SqlServer(connection) => connection,
1141        _ => sql_bail!(
1142            "{} is not a SQL Server connection",
1143            scx.catalog.resolve_full_name(connection_item.name())
1144        ),
1145    };
1146    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1147    let details = details
1148        .as_ref()
1149        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1150    let extras = hex::decode(details)
1151        .map_err(|e| sql_err!("{e}"))
1152        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1153        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1154    Ok(SqlServerSourceConnection {
1155        connection_id: connection_item.id(),
1156        connection: connection_item.id(),
1157        extras,
1158    })
1159}
1160
1161fn plan_postgres_source_connection(
1162    scx: &StatementContext<'_>,
1163    connection: &ResolvedItemName,
1164    options: &Vec<PgConfigOption<Aug>>,
1165) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1166    let connection_item = scx.get_item_by_resolved_name(connection)?;
1167    let PgConfigOptionExtracted {
1168        details,
1169        publication,
1170        // text columns are already part of the source-exports and are only included
1171        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1172        // be removed once we drop support for implicitly created subsources.
1173        text_columns: _,
1174        // exclude columns are already part of the source-exports and are only included
1175        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1176        // be removed once we drop support for implicitly created subsources.
1177        exclude_columns: _,
1178        seen: _,
1179    } = options.clone().try_into()?;
1180    let details = details
1181        .as_ref()
1182        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1183    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1184    let details =
1185        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1186    let publication_details =
1187        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1188    Ok(PostgresSourceConnection {
1189        connection: connection_item.id(),
1190        connection_id: connection_item.id(),
1191        publication: publication.expect("validated exists during purification"),
1192        publication_details,
1193    })
1194}
1195
1196fn plan_kafka_source_connection(
1197    scx: &StatementContext<'_>,
1198    connection_name: &ResolvedItemName,
1199    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1200    include_metadata: &Vec<SourceIncludeMetadata>,
1201) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1202    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1203    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1204        sql_bail!(
1205            "{} is not a kafka connection",
1206            scx.catalog.resolve_full_name(connection_item.name())
1207        )
1208    }
1209    let KafkaSourceConfigOptionExtracted {
1210        group_id_prefix,
1211        topic,
1212        topic_metadata_refresh_interval,
1213        start_timestamp: _, // purified into `start_offset`
1214        start_offset,
1215        seen: _,
1216    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1217    let topic = topic.expect("validated exists during purification");
1218    let mut start_offsets = BTreeMap::new();
1219    if let Some(offsets) = start_offset {
1220        for (part, offset) in offsets.iter().enumerate() {
1221            if *offset < 0 {
1222                sql_bail!("START OFFSET must be a nonnegative integer");
1223            }
1224            start_offsets.insert(i32::try_from(part)?, *offset);
1225        }
1226    }
1227    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1228        // This is a librdkafka-enforced restriction that, if violated,
1229        // would result in a runtime error for the source.
1230        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1231    }
1232    let metadata_columns = include_metadata
1233        .into_iter()
1234        .flat_map(|item| match item {
1235            SourceIncludeMetadata::Timestamp { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "timestamp".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Timestamp))
1241            }
1242            SourceIncludeMetadata::Partition { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "partition".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Partition))
1248            }
1249            SourceIncludeMetadata::Offset { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "offset".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Offset))
1255            }
1256            SourceIncludeMetadata::Headers { alias } => {
1257                let name = match alias {
1258                    Some(name) => name.to_string(),
1259                    None => "headers".to_owned(),
1260                };
1261                Some((name, KafkaMetadataKind::Headers))
1262            }
1263            SourceIncludeMetadata::Header {
1264                alias,
1265                key,
1266                use_bytes,
1267            } => Some((
1268                alias.to_string(),
1269                KafkaMetadataKind::Header {
1270                    key: key.clone(),
1271                    use_bytes: *use_bytes,
1272                },
1273            )),
1274            SourceIncludeMetadata::Key { .. } => {
1275                // handled below
1276                None
1277            }
1278        })
1279        .collect();
1280    Ok(KafkaSourceConnection {
1281        connection: connection_item.id(),
1282        connection_id: connection_item.id(),
1283        topic,
1284        start_offsets,
1285        group_id_prefix,
1286        topic_metadata_refresh_interval,
1287        metadata_columns,
1288    })
1289}
1290
1291fn apply_source_envelope_encoding(
1292    scx: &StatementContext,
1293    envelope: &ast::SourceEnvelope,
1294    format: &Option<FormatSpecifier<Aug>>,
1295    key_desc: Option<RelationDesc>,
1296    value_desc: RelationDesc,
1297    include_metadata: &[SourceIncludeMetadata],
1298    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1299    source_connection: &GenericSourceConnection<ReferencedConnection>,
1300) -> Result<
1301    (
1302        RelationDesc,
1303        SourceEnvelope,
1304        Option<SourceDataEncoding<ReferencedConnection>>,
1305    ),
1306    PlanError,
1307> {
1308    let encoding = match format {
1309        Some(format) => Some(get_encoding(scx, format, envelope)?),
1310        None => None,
1311    };
1312
1313    let (key_desc, value_desc) = match &encoding {
1314        Some(encoding) => {
1315            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1316            // single column of type bytes.
1317            match value_desc.typ().columns() {
1318                [typ] => match typ.scalar_type {
1319                    SqlScalarType::Bytes => {}
1320                    _ => sql_bail!(
1321                        "The schema produced by the source is incompatible with format decoding"
1322                    ),
1323                },
1324                _ => sql_bail!(
1325                    "The schema produced by the source is incompatible with format decoding"
1326                ),
1327            }
1328
1329            let (key_desc, value_desc) = encoding.desc()?;
1330
1331            // TODO(petrosagg): This piece of code seems to be making a statement about the
1332            // nullability of the NONE envelope when the source is Kafka. As written, the code
1333            // misses opportunities to mark columns as not nullable and is over conservative. For
1334            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1335            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1336            // should be replaced with precise type-level reasoning.
1337            let key_desc = key_desc.map(|desc| {
1338                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1339                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1340                if is_kafka && is_envelope_none {
1341                    RelationDesc::from_names_and_types(
1342                        desc.into_iter()
1343                            .map(|(name, typ)| (name, typ.nullable(true))),
1344                    )
1345                } else {
1346                    desc
1347                }
1348            });
1349            (key_desc, value_desc)
1350        }
1351        None => (key_desc, value_desc),
1352    };
1353
1354    // KEY VALUE load generators are the only UPSERT source that
1355    // has no encoding but defaults to `INCLUDE KEY`.
1356    //
1357    // As discussed
1358    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1359    // removing this special case amounts to deciding how to handle null keys
1360    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1361    // this special case in.
1362    //
1363    // Note that this is safe because this generator is
1364    // 1. The only source with no encoding that can have its key included.
1365    // 2. Never produces null keys (or values, for that matter).
1366    let key_envelope_no_encoding = matches!(
1367        source_connection,
1368        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1369            load_generator: LoadGenerator::KeyValue(_),
1370            ..
1371        })
1372    );
1373    let mut key_envelope = get_key_envelope(
1374        include_metadata,
1375        encoding.as_ref(),
1376        key_envelope_no_encoding,
1377    )?;
1378
1379    match (&envelope, &key_envelope) {
1380        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1381        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1382            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1383        ),
1384        _ => {}
1385    };
1386
1387    // Not all source envelopes are compatible with all source connections.
1388    // Whoever constructs the source ingestion pipeline is responsible for
1389    // choosing compatible envelopes and connections.
1390    //
1391    // TODO(guswynn): ambiguously assert which connections and envelopes are
1392    // compatible in typechecking
1393    //
1394    // TODO: remove bails as more support for upsert is added.
1395    let envelope = match &envelope {
1396        // TODO: fixup key envelope
1397        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1398        ast::SourceEnvelope::Debezium => {
1399            //TODO check that key envelope is not set
1400            let after_idx = match typecheck_debezium(&value_desc) {
1401                Ok((_before_idx, after_idx)) => Ok(after_idx),
1402                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1403                    Some(DataEncoding::Avro(_)) => Err(type_err),
1404                    _ => Err(sql_err!(
1405                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1406                    )),
1407                },
1408            }?;
1409
1410            UnplannedSourceEnvelope::Upsert {
1411                style: UpsertStyle::Debezium { after_idx },
1412            }
1413        }
1414        ast::SourceEnvelope::Upsert {
1415            value_decode_err_policy,
1416        } => {
1417            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1418                None => {
1419                    if !key_envelope_no_encoding {
1420                        bail_unsupported!(format!(
1421                            "UPSERT requires a key/value format: {:?}",
1422                            format
1423                        ))
1424                    }
1425                    None
1426                }
1427                Some(key_encoding) => Some(key_encoding),
1428            };
1429            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1430            // specified.
1431            if key_envelope == KeyEnvelope::None {
1432                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1433            }
1434            // If the value decode error policy is not set we use the default upsert style.
1435            let style = match value_decode_err_policy.as_slice() {
1436                [] => UpsertStyle::Default(key_envelope),
1437                [SourceErrorPolicy::Inline { alias }] => {
1438                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1439                    UpsertStyle::ValueErrInline {
1440                        key_envelope,
1441                        error_column: alias
1442                            .as_ref()
1443                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1444                    }
1445                }
1446                _ => {
1447                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1448                }
1449            };
1450
1451            UnplannedSourceEnvelope::Upsert { style }
1452        }
1453        ast::SourceEnvelope::CdcV2 => {
1454            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1455            //TODO check that key envelope is not set
1456            match format {
1457                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1458                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1459            }
1460            UnplannedSourceEnvelope::CdcV2
1461        }
1462    };
1463
1464    let metadata_desc = included_column_desc(metadata_columns_desc);
1465    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1466
1467    Ok((desc, envelope, encoding))
1468}
1469
1470/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1471/// of columns and constraints.
1472fn plan_source_export_desc(
1473    scx: &StatementContext,
1474    name: &UnresolvedItemName,
1475    columns: &Vec<ColumnDef<Aug>>,
1476    constraints: &Vec<TableConstraint<Aug>>,
1477) -> Result<RelationDesc, PlanError> {
1478    let names: Vec<_> = columns
1479        .iter()
1480        .map(|c| normalize::column_name(c.name.clone()))
1481        .collect();
1482
1483    if let Some(dup) = names.iter().duplicates().next() {
1484        sql_bail!("column {} specified more than once", dup.quoted());
1485    }
1486
1487    // Build initial relation type that handles declared data types
1488    // and NOT NULL constraints.
1489    let mut column_types = Vec::with_capacity(columns.len());
1490    let mut keys = Vec::new();
1491
1492    for (i, c) in columns.into_iter().enumerate() {
1493        let aug_data_type = &c.data_type;
1494        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1495        let mut nullable = true;
1496        for option in &c.options {
1497            match &option.option {
1498                ColumnOption::NotNull => nullable = false,
1499                ColumnOption::Default(_) => {
1500                    bail_unsupported!("Source export with default value")
1501                }
1502                ColumnOption::Unique { is_primary } => {
1503                    keys.push(vec![i]);
1504                    if *is_primary {
1505                        nullable = false;
1506                    }
1507                }
1508                other => {
1509                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1510                }
1511            }
1512        }
1513        column_types.push(ty.nullable(nullable));
1514    }
1515
1516    let mut seen_primary = false;
1517    'c: for constraint in constraints {
1518        match constraint {
1519            TableConstraint::Unique {
1520                name: _,
1521                columns,
1522                is_primary,
1523                nulls_not_distinct,
1524            } => {
1525                if seen_primary && *is_primary {
1526                    sql_bail!(
1527                        "multiple primary keys for source export {} are not allowed",
1528                        name.to_ast_string_stable()
1529                    );
1530                }
1531                seen_primary = *is_primary || seen_primary;
1532
1533                let mut key = vec![];
1534                for column in columns {
1535                    let column = normalize::column_name(column.clone());
1536                    match names.iter().position(|name| *name == column) {
1537                        None => sql_bail!("unknown column in constraint: {}", column),
1538                        Some(i) => {
1539                            let nullable = &mut column_types[i].nullable;
1540                            if *is_primary {
1541                                if *nulls_not_distinct {
1542                                    sql_bail!(
1543                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1544                                    );
1545                                }
1546                                *nullable = false;
1547                            } else if !(*nulls_not_distinct || !*nullable) {
1548                                // Non-primary key unique constraints are only keys if all of their
1549                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1550                                break 'c;
1551                            }
1552
1553                            key.push(i);
1554                        }
1555                    }
1556                }
1557
1558                if *is_primary {
1559                    keys.insert(0, key);
1560                } else {
1561                    keys.push(key);
1562                }
1563            }
1564            TableConstraint::ForeignKey { .. } => {
1565                bail_unsupported!("Source export with a foreign key")
1566            }
1567            TableConstraint::Check { .. } => {
1568                bail_unsupported!("Source export with a check constraint")
1569            }
1570        }
1571    }
1572
1573    let typ = SqlRelationType::new(column_types).with_keys(keys);
1574    let desc = RelationDesc::new(typ, names);
1575    Ok(desc)
1576}
1577
1578generate_extracted_config!(
1579    CreateSubsourceOption,
1580    (Progress, bool, Default(false)),
1581    (ExternalReference, UnresolvedItemName),
1582    (RetainHistory, OptionalDuration),
1583    (TextColumns, Vec::<Ident>, Default(vec![])),
1584    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1585    (Details, String)
1586);
1587
1588pub fn plan_create_subsource(
1589    scx: &StatementContext,
1590    stmt: CreateSubsourceStatement<Aug>,
1591) -> Result<Plan, PlanError> {
1592    let CreateSubsourceStatement {
1593        name,
1594        columns,
1595        of_source,
1596        constraints,
1597        if_not_exists,
1598        with_options,
1599    } = &stmt;
1600
1601    let CreateSubsourceOptionExtracted {
1602        progress,
1603        retain_history,
1604        external_reference,
1605        text_columns,
1606        exclude_columns,
1607        details,
1608        seen: _,
1609    } = with_options.clone().try_into()?;
1610
1611    // This invariant is enforced during purification; we are responsible for
1612    // creating the AST for subsources as a response to CREATE SOURCE
1613    // statements, so this would fire in integration testing if we failed to
1614    // uphold it.
1615    assert!(
1616        progress ^ (external_reference.is_some() && of_source.is_some()),
1617        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1618    );
1619
1620    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1621
1622    let data_source = if let Some(source_reference) = of_source {
1623        // If the new source table syntax is forced we should not be creating any non-progress
1624        // subsources.
1625        if scx.catalog.system_vars().enable_create_table_from_source()
1626            && scx.catalog.system_vars().force_source_table_syntax()
1627        {
1628            Err(PlanError::UseTablesForSources(
1629                "CREATE SUBSOURCE".to_string(),
1630            ))?;
1631        }
1632
1633        // This is a subsource with the "natural" dependency order, i.e. it is
1634        // not a legacy subsource with the inverted structure.
1635        let ingestion_id = *source_reference.item_id();
1636        let external_reference = external_reference.unwrap();
1637
1638        // Decode the details option stored on the subsource statement, which contains information
1639        // created during the purification process.
1640        let details = details
1641            .as_ref()
1642            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1643        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1644        let details =
1645            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1646        let details =
1647            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1648        let details = match details {
1649            SourceExportStatementDetails::Postgres { table } => {
1650                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1651                    column_casts: crate::pure::postgres::generate_column_casts(
1652                        scx,
1653                        &table,
1654                        &text_columns,
1655                    )?,
1656                    table,
1657                })
1658            }
1659            SourceExportStatementDetails::MySql {
1660                table,
1661                initial_gtid_set,
1662            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1663                table,
1664                initial_gtid_set,
1665                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1666                exclude_columns: exclude_columns
1667                    .into_iter()
1668                    .map(|c| c.into_string())
1669                    .collect(),
1670            }),
1671            SourceExportStatementDetails::SqlServer {
1672                table,
1673                capture_instance,
1674                initial_lsn,
1675            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1676                capture_instance,
1677                table,
1678                initial_lsn,
1679                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1680                exclude_columns: exclude_columns
1681                    .into_iter()
1682                    .map(|c| c.into_string())
1683                    .collect(),
1684            }),
1685            SourceExportStatementDetails::LoadGenerator { output } => {
1686                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1687            }
1688            SourceExportStatementDetails::Kafka {} => {
1689                bail_unsupported!("subsources cannot reference Kafka sources")
1690            }
1691        };
1692        DataSourceDesc::IngestionExport {
1693            ingestion_id,
1694            external_reference,
1695            details,
1696            // Subsources don't currently support non-default envelopes / encoding
1697            data_config: SourceExportDataConfig {
1698                envelope: SourceEnvelope::None(NoneEnvelope {
1699                    key_envelope: KeyEnvelope::None,
1700                    key_arity: 0,
1701                }),
1702                encoding: None,
1703            },
1704        }
1705    } else if progress {
1706        DataSourceDesc::Progress
1707    } else {
1708        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1709    };
1710
1711    let if_not_exists = *if_not_exists;
1712    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1713
1714    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1715
1716    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1717    let source = Source {
1718        create_sql,
1719        data_source,
1720        desc,
1721        compaction_window,
1722    };
1723
1724    Ok(Plan::CreateSource(CreateSourcePlan {
1725        name,
1726        source,
1727        if_not_exists,
1728        timeline: Timeline::EpochMilliseconds,
1729        in_cluster: None,
1730    }))
1731}
1732
1733generate_extracted_config!(
1734    TableFromSourceOption,
1735    (TextColumns, Vec::<Ident>, Default(vec![])),
1736    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1737    (PartitionBy, Vec<Ident>),
1738    (RetainHistory, OptionalDuration),
1739    (Details, String)
1740);
1741
1742pub fn plan_create_table_from_source(
1743    scx: &StatementContext,
1744    stmt: CreateTableFromSourceStatement<Aug>,
1745) -> Result<Plan, PlanError> {
1746    if !scx.catalog.system_vars().enable_create_table_from_source() {
1747        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1748    }
1749
1750    let CreateTableFromSourceStatement {
1751        name,
1752        columns,
1753        constraints,
1754        if_not_exists,
1755        source,
1756        external_reference,
1757        envelope,
1758        format,
1759        include_metadata,
1760        with_options,
1761    } = &stmt;
1762
1763    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1764
1765    let TableFromSourceOptionExtracted {
1766        text_columns,
1767        exclude_columns,
1768        retain_history,
1769        partition_by,
1770        details,
1771        seen: _,
1772    } = with_options.clone().try_into()?;
1773
1774    let source_item = scx.get_item_by_resolved_name(source)?;
1775    let ingestion_id = source_item.id();
1776
1777    // Decode the details option stored on the statement, which contains information
1778    // created during the purification process.
1779    let details = details
1780        .as_ref()
1781        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1782    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1783    let details =
1784        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1785    let details =
1786        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1787
1788    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1789        && include_metadata
1790            .iter()
1791            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1792    {
1793        // TODO(guswynn): should this be `bail_unsupported!`?
1794        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1795    }
1796    if !matches!(
1797        details,
1798        SourceExportStatementDetails::Kafka { .. }
1799            | SourceExportStatementDetails::LoadGenerator { .. }
1800    ) && !include_metadata.is_empty()
1801    {
1802        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1803    }
1804
1805    let details = match details {
1806        SourceExportStatementDetails::Postgres { table } => {
1807            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1808                column_casts: crate::pure::postgres::generate_column_casts(
1809                    scx,
1810                    &table,
1811                    &text_columns,
1812                )?,
1813                table,
1814            })
1815        }
1816        SourceExportStatementDetails::MySql {
1817            table,
1818            initial_gtid_set,
1819        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1820            table,
1821            initial_gtid_set,
1822            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1823            exclude_columns: exclude_columns
1824                .into_iter()
1825                .map(|c| c.into_string())
1826                .collect(),
1827        }),
1828        SourceExportStatementDetails::SqlServer {
1829            table,
1830            capture_instance,
1831            initial_lsn,
1832        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1833            table,
1834            capture_instance,
1835            initial_lsn,
1836            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1837            exclude_columns: exclude_columns
1838                .into_iter()
1839                .map(|c| c.into_string())
1840                .collect(),
1841        }),
1842        SourceExportStatementDetails::LoadGenerator { output } => {
1843            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1844        }
1845        SourceExportStatementDetails::Kafka {} => {
1846            if !include_metadata.is_empty()
1847                && !matches!(
1848                    envelope,
1849                    ast::SourceEnvelope::Upsert { .. }
1850                        | ast::SourceEnvelope::None
1851                        | ast::SourceEnvelope::Debezium
1852                )
1853            {
1854                // TODO(guswynn): should this be `bail_unsupported!`?
1855                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1856            }
1857
1858            let metadata_columns = include_metadata
1859                .into_iter()
1860                .flat_map(|item| match item {
1861                    SourceIncludeMetadata::Timestamp { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "timestamp".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Timestamp))
1867                    }
1868                    SourceIncludeMetadata::Partition { alias } => {
1869                        let name = match alias {
1870                            Some(name) => name.to_string(),
1871                            None => "partition".to_owned(),
1872                        };
1873                        Some((name, KafkaMetadataKind::Partition))
1874                    }
1875                    SourceIncludeMetadata::Offset { alias } => {
1876                        let name = match alias {
1877                            Some(name) => name.to_string(),
1878                            None => "offset".to_owned(),
1879                        };
1880                        Some((name, KafkaMetadataKind::Offset))
1881                    }
1882                    SourceIncludeMetadata::Headers { alias } => {
1883                        let name = match alias {
1884                            Some(name) => name.to_string(),
1885                            None => "headers".to_owned(),
1886                        };
1887                        Some((name, KafkaMetadataKind::Headers))
1888                    }
1889                    SourceIncludeMetadata::Header {
1890                        alias,
1891                        key,
1892                        use_bytes,
1893                    } => Some((
1894                        alias.to_string(),
1895                        KafkaMetadataKind::Header {
1896                            key: key.clone(),
1897                            use_bytes: *use_bytes,
1898                        },
1899                    )),
1900                    SourceIncludeMetadata::Key { .. } => {
1901                        // handled below
1902                        None
1903                    }
1904                })
1905                .collect();
1906
1907            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1908        }
1909    };
1910
1911    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1912
1913    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1914    // during purification and define the `columns` and `constraints` fields for the statement,
1915    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1916    // we use the source connection's default schema.
1917    let (key_desc, value_desc) =
1918        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1919            let columns = match columns {
1920                TableFromSourceColumns::Defined(columns) => columns,
1921                _ => unreachable!(),
1922            };
1923            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1924            (None, desc)
1925        } else {
1926            let key_desc = source_connection.default_key_desc();
1927            let value_desc = source_connection.default_value_desc();
1928            (Some(key_desc), value_desc)
1929        };
1930
1931    let metadata_columns_desc = match &details {
1932        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1933            metadata_columns, ..
1934        }) => kafka_metadata_columns_desc(metadata_columns),
1935        _ => vec![],
1936    };
1937
1938    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1939        scx,
1940        &envelope,
1941        format,
1942        key_desc,
1943        value_desc,
1944        include_metadata,
1945        metadata_columns_desc,
1946        source_connection,
1947    )?;
1948    if let TableFromSourceColumns::Named(col_names) = columns {
1949        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1950    }
1951
1952    let names: Vec<_> = desc.iter_names().cloned().collect();
1953    if let Some(dup) = names.iter().duplicates().next() {
1954        sql_bail!("column {} specified more than once", dup.quoted());
1955    }
1956
1957    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1958
1959    // Allow users to specify a timeline. If they do not, determine a default
1960    // timeline for the source.
1961    let timeline = match envelope {
1962        SourceEnvelope::CdcV2 => {
1963            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1964        }
1965        _ => Timeline::EpochMilliseconds,
1966    };
1967
1968    if let Some(partition_by) = partition_by {
1969        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1970        check_partition_by(&desc, partition_by)?;
1971    }
1972
1973    let data_source = DataSourceDesc::IngestionExport {
1974        ingestion_id,
1975        external_reference: external_reference
1976            .as_ref()
1977            .expect("populated in purification")
1978            .clone(),
1979        details,
1980        data_config: SourceExportDataConfig { envelope, encoding },
1981    };
1982
1983    let if_not_exists = *if_not_exists;
1984
1985    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1986
1987    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1988    let table = Table {
1989        create_sql,
1990        desc: VersionedRelationDesc::new(desc),
1991        temporary: false,
1992        compaction_window,
1993        data_source: TableDataSource::DataSource {
1994            desc: data_source,
1995            timeline,
1996        },
1997    };
1998
1999    Ok(Plan::CreateTable(CreateTablePlan {
2000        name,
2001        table,
2002        if_not_exists,
2003    }))
2004}
2005
2006generate_extracted_config!(
2007    LoadGeneratorOption,
2008    (TickInterval, Duration),
2009    (AsOf, u64, Default(0_u64)),
2010    (UpTo, u64, Default(u64::MAX)),
2011    (ScaleFactor, f64),
2012    (MaxCardinality, u64),
2013    (Keys, u64),
2014    (SnapshotRounds, u64),
2015    (TransactionalSnapshot, bool),
2016    (ValueSize, u64),
2017    (Seed, u64),
2018    (Partitions, u64),
2019    (BatchSize, u64)
2020);
2021
2022impl LoadGeneratorOptionExtracted {
2023    pub(super) fn ensure_only_valid_options(
2024        &self,
2025        loadgen: &ast::LoadGenerator,
2026    ) -> Result<(), PlanError> {
2027        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2028
2029        let mut options = self.seen.clone();
2030
2031        let permitted_options: &[_] = match loadgen {
2032            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2033            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2034            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2035            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2036            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2037            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2038            ast::LoadGenerator::KeyValue => &[
2039                TickInterval,
2040                Keys,
2041                SnapshotRounds,
2042                TransactionalSnapshot,
2043                ValueSize,
2044                Seed,
2045                Partitions,
2046                BatchSize,
2047            ],
2048        };
2049
2050        for o in permitted_options {
2051            options.remove(o);
2052        }
2053
2054        if !options.is_empty() {
2055            sql_bail!(
2056                "{} load generators do not support {} values",
2057                loadgen,
2058                options.iter().join(", ")
2059            )
2060        }
2061
2062        Ok(())
2063    }
2064}
2065
2066pub(crate) fn load_generator_ast_to_generator(
2067    scx: &StatementContext,
2068    loadgen: &ast::LoadGenerator,
2069    options: &[LoadGeneratorOption<Aug>],
2070    include_metadata: &[SourceIncludeMetadata],
2071) -> Result<LoadGenerator, PlanError> {
2072    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2073    extracted.ensure_only_valid_options(loadgen)?;
2074
2075    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2076        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2077    }
2078
2079    let load_generator = match loadgen {
2080        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2081        ast::LoadGenerator::Clock => {
2082            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2083            LoadGenerator::Clock
2084        }
2085        ast::LoadGenerator::Counter => {
2086            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2087            let LoadGeneratorOptionExtracted {
2088                max_cardinality, ..
2089            } = extracted;
2090            LoadGenerator::Counter { max_cardinality }
2091        }
2092        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2093        ast::LoadGenerator::Datums => {
2094            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2095            LoadGenerator::Datums
2096        }
2097        ast::LoadGenerator::Tpch => {
2098            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2099
2100            // Default to 0.01 scale factor (=10MB).
2101            let sf: f64 = scale_factor.unwrap_or(0.01);
2102            if !sf.is_finite() || sf < 0.0 {
2103                sql_bail!("unsupported scale factor {sf}");
2104            }
2105
2106            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2107                let total = (sf * multiplier).floor();
2108                let mut i = i64::try_cast_from(total)
2109                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2110                if i < 1 {
2111                    i = 1;
2112                }
2113                Ok(i)
2114            };
2115
2116            // The multiplications here are safely unchecked because they will
2117            // overflow to infinity, which will be caught by f64_to_i64.
2118            let count_supplier = f_to_i(10_000f64)?;
2119            let count_part = f_to_i(200_000f64)?;
2120            let count_customer = f_to_i(150_000f64)?;
2121            let count_orders = f_to_i(150_000f64 * 10f64)?;
2122            let count_clerk = f_to_i(1_000f64)?;
2123
2124            LoadGenerator::Tpch {
2125                count_supplier,
2126                count_part,
2127                count_customer,
2128                count_orders,
2129                count_clerk,
2130            }
2131        }
2132        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2133            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2134            let LoadGeneratorOptionExtracted {
2135                keys,
2136                snapshot_rounds,
2137                transactional_snapshot,
2138                value_size,
2139                tick_interval,
2140                seed,
2141                partitions,
2142                batch_size,
2143                ..
2144            } = extracted;
2145
2146            let mut include_offset = None;
2147            for im in include_metadata {
2148                match im {
2149                    SourceIncludeMetadata::Offset { alias } => {
2150                        include_offset = match alias {
2151                            Some(alias) => Some(alias.to_string()),
2152                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2153                        }
2154                    }
2155                    SourceIncludeMetadata::Key { .. } => continue,
2156
2157                    _ => {
2158                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2159                    }
2160                };
2161            }
2162
2163            let lgkv = KeyValueLoadGenerator {
2164                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2165                snapshot_rounds: snapshot_rounds
2166                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2167                // Defaults to true.
2168                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2169                value_size: value_size
2170                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2171                partitions: partitions
2172                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2173                tick_interval,
2174                batch_size: batch_size
2175                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2176                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2177                include_offset,
2178            };
2179
2180            if lgkv.keys == 0
2181                || lgkv.partitions == 0
2182                || lgkv.value_size == 0
2183                || lgkv.batch_size == 0
2184            {
2185                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2186            }
2187
2188            if lgkv.keys % lgkv.partitions != 0 {
2189                sql_bail!("KEYS must be a multiple of PARTITIONS")
2190            }
2191
2192            if lgkv.batch_size > lgkv.keys {
2193                sql_bail!("KEYS must be larger than BATCH SIZE")
2194            }
2195
2196            // This constraints simplifies the source implementation.
2197            // We can lift it later.
2198            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2199                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2200            }
2201
2202            if lgkv.snapshot_rounds == 0 {
2203                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2204            }
2205
2206            LoadGenerator::KeyValue(lgkv)
2207        }
2208    };
2209
2210    Ok(load_generator)
2211}
2212
2213fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2214    let before = value_desc.get_by_name(&"before".into());
2215    let (after_idx, after_ty) = value_desc
2216        .get_by_name(&"after".into())
2217        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2218    let before_idx = if let Some((before_idx, before_ty)) = before {
2219        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2220            sql_bail!("'before' column must be of type record");
2221        }
2222        if before_ty != after_ty {
2223            sql_bail!("'before' type differs from 'after' column");
2224        }
2225        Some(before_idx)
2226    } else {
2227        None
2228    };
2229    Ok((before_idx, after_idx))
2230}
2231
2232fn get_encoding(
2233    scx: &StatementContext,
2234    format: &FormatSpecifier<Aug>,
2235    envelope: &ast::SourceEnvelope,
2236) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2237    let encoding = match format {
2238        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2239        FormatSpecifier::KeyValue { key, value } => {
2240            let key = {
2241                let encoding = get_encoding_inner(scx, key)?;
2242                Some(encoding.key.unwrap_or(encoding.value))
2243            };
2244            let value = get_encoding_inner(scx, value)?.value;
2245            SourceDataEncoding { key, value }
2246        }
2247    };
2248
2249    let requires_keyvalue = matches!(
2250        envelope,
2251        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2252    );
2253    let is_keyvalue = encoding.key.is_some();
2254    if requires_keyvalue && !is_keyvalue {
2255        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2256    };
2257
2258    Ok(encoding)
2259}
2260
2261/// Determine the cluster ID to use for this item.
2262///
2263/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2264/// Because of this, do not normalize/canonicalize the create SQL statement
2265/// until after calling this function.
2266fn source_sink_cluster_config<'a, 'ctx>(
2267    scx: &'a StatementContext<'ctx>,
2268    in_cluster: &mut Option<ResolvedClusterName>,
2269) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2270    let cluster = match in_cluster {
2271        None => {
2272            let cluster = scx.catalog.resolve_cluster(None)?;
2273            *in_cluster = Some(ResolvedClusterName {
2274                id: cluster.id(),
2275                print_name: None,
2276            });
2277            cluster
2278        }
2279        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2280    };
2281
2282    Ok(cluster)
2283}
2284
2285generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2286
2287#[derive(Debug)]
2288pub struct Schema {
2289    pub key_schema: Option<String>,
2290    pub value_schema: String,
2291    /// Reference schemas for the key schema, in dependency order.
2292    pub key_reference_schemas: Vec<String>,
2293    /// Reference schemas for the value schema, in dependency order.
2294    pub value_reference_schemas: Vec<String>,
2295    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2296    pub confluent_wire_format: bool,
2297}
2298
2299fn get_encoding_inner(
2300    scx: &StatementContext,
2301    format: &Format<Aug>,
2302) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2303    let value = match format {
2304        Format::Bytes => DataEncoding::Bytes,
2305        Format::Avro(schema) => {
2306            let Schema {
2307                key_schema,
2308                value_schema,
2309                key_reference_schemas,
2310                value_reference_schemas,
2311                csr_connection,
2312                confluent_wire_format,
2313            } = match schema {
2314                // TODO(jldlaughlin): we need a way to pass in primary key information
2315                // when building a source from a string or file.
2316                AvroSchema::InlineSchema {
2317                    schema: ast::Schema { schema },
2318                    with_options,
2319                } => {
2320                    let AvroSchemaOptionExtracted {
2321                        confluent_wire_format,
2322                        ..
2323                    } = with_options.clone().try_into()?;
2324
2325                    Schema {
2326                        key_schema: None,
2327                        value_schema: schema.clone(),
2328                        key_reference_schemas: vec![],
2329                        value_reference_schemas: vec![],
2330                        csr_connection: None,
2331                        confluent_wire_format,
2332                    }
2333                }
2334                AvroSchema::Csr {
2335                    csr_connection:
2336                        CsrConnectionAvro {
2337                            connection,
2338                            seed,
2339                            key_strategy: _,
2340                            value_strategy: _,
2341                        },
2342                } => {
2343                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2344                    let csr_connection = match item.connection()? {
2345                        Connection::Csr(_) => item.id(),
2346                        _ => {
2347                            sql_bail!(
2348                                "{} is not a schema registry connection",
2349                                scx.catalog
2350                                    .resolve_full_name(item.name())
2351                                    .to_string()
2352                                    .quoted()
2353                            )
2354                        }
2355                    };
2356
2357                    if let Some(seed) = seed {
2358                        Schema {
2359                            key_schema: seed.key_schema.clone(),
2360                            value_schema: seed.value_schema.clone(),
2361                            key_reference_schemas: seed.key_reference_schemas.clone(),
2362                            value_reference_schemas: seed.value_reference_schemas.clone(),
2363                            csr_connection: Some(csr_connection),
2364                            confluent_wire_format: true,
2365                        }
2366                    } else {
2367                        unreachable!("CSR seed resolution should already have been called: Avro")
2368                    }
2369                }
2370            };
2371
2372            if let Some(key_schema) = key_schema {
2373                return Ok(SourceDataEncoding {
2374                    key: Some(DataEncoding::Avro(AvroEncoding {
2375                        schema: key_schema,
2376                        reference_schemas: key_reference_schemas,
2377                        csr_connection: csr_connection.clone(),
2378                        confluent_wire_format,
2379                    })),
2380                    value: DataEncoding::Avro(AvroEncoding {
2381                        schema: value_schema,
2382                        reference_schemas: value_reference_schemas,
2383                        csr_connection,
2384                        confluent_wire_format,
2385                    }),
2386                });
2387            } else {
2388                DataEncoding::Avro(AvroEncoding {
2389                    schema: value_schema,
2390                    reference_schemas: value_reference_schemas,
2391                    csr_connection,
2392                    confluent_wire_format,
2393                })
2394            }
2395        }
2396        Format::Protobuf(schema) => match schema {
2397            ProtobufSchema::Csr {
2398                csr_connection:
2399                    CsrConnectionProtobuf {
2400                        connection:
2401                            CsrConnection {
2402                                connection,
2403                                options,
2404                            },
2405                        seed,
2406                    },
2407            } => {
2408                if let Some(CsrSeedProtobuf { key, value }) = seed {
2409                    let item = scx.get_item_by_resolved_name(connection)?;
2410                    let _ = match item.connection()? {
2411                        Connection::Csr(connection) => connection,
2412                        _ => {
2413                            sql_bail!(
2414                                "{} is not a schema registry connection",
2415                                scx.catalog
2416                                    .resolve_full_name(item.name())
2417                                    .to_string()
2418                                    .quoted()
2419                            )
2420                        }
2421                    };
2422
2423                    if !options.is_empty() {
2424                        sql_bail!("Protobuf CSR connections do not support any options");
2425                    }
2426
2427                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2428                        descriptors: strconv::parse_bytes(&value.schema)?,
2429                        message_name: value.message_name.clone(),
2430                        confluent_wire_format: true,
2431                    });
2432                    if let Some(key) = key {
2433                        return Ok(SourceDataEncoding {
2434                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2435                                descriptors: strconv::parse_bytes(&key.schema)?,
2436                                message_name: key.message_name.clone(),
2437                                confluent_wire_format: true,
2438                            })),
2439                            value,
2440                        });
2441                    }
2442                    value
2443                } else {
2444                    unreachable!("CSR seed resolution should already have been called: Proto")
2445                }
2446            }
2447            ProtobufSchema::InlineSchema {
2448                message_name,
2449                schema: ast::Schema { schema },
2450            } => {
2451                let descriptors = strconv::parse_bytes(schema)?;
2452
2453                DataEncoding::Protobuf(ProtobufEncoding {
2454                    descriptors,
2455                    message_name: message_name.to_owned(),
2456                    confluent_wire_format: false,
2457                })
2458            }
2459        },
2460        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2461            regex: mz_repr::adt::regex::Regex::new(regex, false)
2462                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2463        }),
2464        Format::Csv { columns, delimiter } => {
2465            let columns = match columns {
2466                CsvColumns::Header { names } => {
2467                    if names.is_empty() {
2468                        sql_bail!("[internal error] column spec should get names in purify")
2469                    }
2470                    ColumnSpec::Header {
2471                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2472                    }
2473                }
2474                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2475            };
2476            DataEncoding::Csv(CsvEncoding {
2477                columns,
2478                delimiter: u8::try_from(*delimiter)
2479                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2480            })
2481        }
2482        Format::Json { array: false } => DataEncoding::Json,
2483        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2484        Format::Text => DataEncoding::Text,
2485    };
2486    Ok(SourceDataEncoding { key: None, value })
2487}
2488
2489/// Extract the key envelope, if it is requested
2490fn get_key_envelope(
2491    included_items: &[SourceIncludeMetadata],
2492    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2493    key_envelope_no_encoding: bool,
2494) -> Result<KeyEnvelope, PlanError> {
2495    let key_definition = included_items
2496        .iter()
2497        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2498    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2499        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2500            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2501            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2502            (Some(name), _) if key_envelope_no_encoding => {
2503                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2504            }
2505            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2506            (_, None) => {
2507                // `kd.alias` == `None` means `INCLUDE KEY`
2508                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2509                // These both make sense with the same error message
2510                sql_bail!(
2511                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2512                        got bare FORMAT"
2513                );
2514            }
2515        }
2516    } else {
2517        Ok(KeyEnvelope::None)
2518    }
2519}
2520
2521/// Gets the key envelope for a given key encoding when no name for the key has
2522/// been requested by the user.
2523fn get_unnamed_key_envelope(
2524    key: Option<&DataEncoding<ReferencedConnection>>,
2525) -> Result<KeyEnvelope, PlanError> {
2526    // If the key is requested but comes from an unnamed type then it gets the name "key"
2527    //
2528    // Otherwise it gets the names of the columns in the type
2529    let is_composite = match key {
2530        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2531        Some(
2532            DataEncoding::Avro(_)
2533            | DataEncoding::Csv(_)
2534            | DataEncoding::Protobuf(_)
2535            | DataEncoding::Regex { .. },
2536        ) => true,
2537        None => false,
2538    };
2539
2540    if is_composite {
2541        Ok(KeyEnvelope::Flattened)
2542    } else {
2543        Ok(KeyEnvelope::Named("key".to_string()))
2544    }
2545}
2546
2547pub fn describe_create_view(
2548    _: &StatementContext,
2549    _: CreateViewStatement<Aug>,
2550) -> Result<StatementDesc, PlanError> {
2551    Ok(StatementDesc::new(None))
2552}
2553
2554pub fn plan_view(
2555    scx: &StatementContext,
2556    def: &mut ViewDefinition<Aug>,
2557    temporary: bool,
2558) -> Result<(QualifiedItemName, View), PlanError> {
2559    let create_sql = normalize::create_statement(
2560        scx,
2561        Statement::CreateView(CreateViewStatement {
2562            if_exists: IfExistsBehavior::Error,
2563            temporary,
2564            definition: def.clone(),
2565        }),
2566    )?;
2567
2568    let ViewDefinition {
2569        name,
2570        columns,
2571        query,
2572    } = def;
2573
2574    let query::PlannedRootQuery {
2575        expr,
2576        mut desc,
2577        finishing,
2578        scope: _,
2579    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2580    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2581    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2582    // here to help with database-issues#236. However, in the meantime, there might be a better
2583    // approach to solve database-issues#236:
2584    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2585    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2586        &finishing,
2587        expr.arity()
2588    ));
2589    if expr.contains_parameters()? {
2590        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2591    }
2592
2593    let dependencies = expr
2594        .depends_on()
2595        .into_iter()
2596        .map(|gid| scx.catalog.resolve_item_id(&gid))
2597        .collect();
2598
2599    let name = if temporary {
2600        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2601    } else {
2602        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2603    };
2604
2605    plan_utils::maybe_rename_columns(
2606        format!("view {}", scx.catalog.resolve_full_name(&name)),
2607        &mut desc,
2608        columns,
2609    )?;
2610    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2611
2612    if let Some(dup) = names.iter().duplicates().next() {
2613        sql_bail!("column {} specified more than once", dup.quoted());
2614    }
2615
2616    let view = View {
2617        create_sql,
2618        expr,
2619        dependencies,
2620        column_names: names,
2621        temporary,
2622    };
2623
2624    Ok((name, view))
2625}
2626
2627pub fn plan_create_view(
2628    scx: &StatementContext,
2629    mut stmt: CreateViewStatement<Aug>,
2630) -> Result<Plan, PlanError> {
2631    let CreateViewStatement {
2632        temporary,
2633        if_exists,
2634        definition,
2635    } = &mut stmt;
2636    let (name, view) = plan_view(scx, definition, *temporary)?;
2637
2638    // Override the statement-level IfExistsBehavior with Skip if this is
2639    // explicitly requested in the PlanContext (the default is `false`).
2640    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2641
2642    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2643        let if_exists = true;
2644        let cascade = false;
2645        let maybe_item_to_drop = plan_drop_item(
2646            scx,
2647            ObjectType::View,
2648            if_exists,
2649            definition.name.clone(),
2650            cascade,
2651        )?;
2652
2653        // Check if the new View depends on the item that we would be replacing.
2654        if let Some(id) = maybe_item_to_drop {
2655            let dependencies = view.expr.depends_on();
2656            let invalid_drop = scx
2657                .get_item(&id)
2658                .global_ids()
2659                .any(|gid| dependencies.contains(&gid));
2660            if invalid_drop {
2661                let item = scx.catalog.get_item(&id);
2662                sql_bail!(
2663                    "cannot replace view {0}: depended upon by new {0} definition",
2664                    scx.catalog.resolve_full_name(item.name())
2665                );
2666            }
2667
2668            Some(id)
2669        } else {
2670            None
2671        }
2672    } else {
2673        None
2674    };
2675    let drop_ids = replace
2676        .map(|id| {
2677            scx.catalog
2678                .item_dependents(id)
2679                .into_iter()
2680                .map(|id| id.unwrap_item_id())
2681                .collect()
2682        })
2683        .unwrap_or_default();
2684
2685    validate_view_dependencies(scx, &view.dependencies.0)?;
2686
2687    // Check for an object in the catalog with this same name
2688    let full_name = scx.catalog.resolve_full_name(&name);
2689    let partial_name = PartialItemName::from(full_name.clone());
2690    // For PostgreSQL compatibility, we need to prevent creating views when
2691    // there is an existing object *or* type of the same name.
2692    if let (Ok(item), IfExistsBehavior::Error, false) = (
2693        scx.catalog.resolve_item_or_type(&partial_name),
2694        *if_exists,
2695        ignore_if_exists_errors,
2696    ) {
2697        return Err(PlanError::ItemAlreadyExists {
2698            name: full_name.to_string(),
2699            item_type: item.item_type(),
2700        });
2701    }
2702
2703    Ok(Plan::CreateView(CreateViewPlan {
2704        name,
2705        view,
2706        replace,
2707        drop_ids,
2708        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2709        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2710    }))
2711}
2712
2713/// Validate the dependencies of a (materialized) view.
2714fn validate_view_dependencies(
2715    scx: &StatementContext,
2716    dependencies: &BTreeSet<CatalogItemId>,
2717) -> Result<(), PlanError> {
2718    for id in dependencies {
2719        let item = scx.catalog.get_item(id);
2720        if item.replacement_target().is_some() {
2721            let name = scx.catalog.minimal_qualification(item.name());
2722            return Err(PlanError::InvalidDependency {
2723                name: name.to_string(),
2724                item_type: format!("replacement {}", item.item_type()),
2725            });
2726        }
2727    }
2728
2729    Ok(())
2730}
2731
2732pub fn describe_create_materialized_view(
2733    _: &StatementContext,
2734    _: CreateMaterializedViewStatement<Aug>,
2735) -> Result<StatementDesc, PlanError> {
2736    Ok(StatementDesc::new(None))
2737}
2738
2739pub fn describe_create_continual_task(
2740    _: &StatementContext,
2741    _: CreateContinualTaskStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743    Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_create_network_policy(
2747    _: &StatementContext,
2748    _: CreateNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750    Ok(StatementDesc::new(None))
2751}
2752
2753pub fn describe_alter_network_policy(
2754    _: &StatementContext,
2755    _: AlterNetworkPolicyStatement<Aug>,
2756) -> Result<StatementDesc, PlanError> {
2757    Ok(StatementDesc::new(None))
2758}
2759
2760pub fn plan_create_materialized_view(
2761    scx: &StatementContext,
2762    mut stmt: CreateMaterializedViewStatement<Aug>,
2763) -> Result<Plan, PlanError> {
2764    let cluster_id =
2765        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2766    stmt.in_cluster = Some(ResolvedClusterName {
2767        id: cluster_id,
2768        print_name: None,
2769    });
2770
2771    let target_replica = match &stmt.in_cluster_replica {
2772        Some(replica_name) => {
2773            scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2774
2775            let cluster = scx.catalog.get_cluster(cluster_id);
2776            let replica_id = cluster
2777                .replica_ids()
2778                .get(replica_name.as_str())
2779                .copied()
2780                .ok_or_else(|| {
2781                    CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2782                })?;
2783            Some(replica_id)
2784        }
2785        None => None,
2786    };
2787
2788    let create_sql =
2789        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2790
2791    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2792    let name = scx.allocate_qualified_name(partial_name.clone())?;
2793
2794    let query::PlannedRootQuery {
2795        expr,
2796        mut desc,
2797        finishing,
2798        scope: _,
2799    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2800    // We get back a trivial finishing, see comment in `plan_view`.
2801    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2802        &finishing,
2803        expr.arity()
2804    ));
2805    if expr.contains_parameters()? {
2806        return Err(PlanError::ParameterNotAllowed(
2807            "materialized views".to_string(),
2808        ));
2809    }
2810
2811    plan_utils::maybe_rename_columns(
2812        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2813        &mut desc,
2814        &stmt.columns,
2815    )?;
2816    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2817
2818    let MaterializedViewOptionExtracted {
2819        assert_not_null,
2820        partition_by,
2821        retain_history,
2822        refresh,
2823        seen: _,
2824    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2825
2826    if let Some(partition_by) = partition_by {
2827        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2828        check_partition_by(&desc, partition_by)?;
2829    }
2830
2831    let refresh_schedule = {
2832        let mut refresh_schedule = RefreshSchedule::default();
2833        let mut on_commits_seen = 0;
2834        for refresh_option_value in refresh {
2835            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2836                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2837            }
2838            match refresh_option_value {
2839                RefreshOptionValue::OnCommit => {
2840                    on_commits_seen += 1;
2841                }
2842                RefreshOptionValue::AtCreation => {
2843                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2844                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2845                }
2846                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2847                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2848                    let ecx = &ExprContext {
2849                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2850                        name: "REFRESH AT",
2851                        scope: &Scope::empty(),
2852                        relation_type: &SqlRelationType::empty(),
2853                        allow_aggregates: false,
2854                        allow_subqueries: false,
2855                        allow_parameters: false,
2856                        allow_windows: false,
2857                    };
2858                    let hir = plan_expr(ecx, &time)?.cast_to(
2859                        ecx,
2860                        CastContext::Assignment,
2861                        &SqlScalarType::MzTimestamp,
2862                    )?;
2863                    // (mz_now was purified away to a literal earlier)
2864                    let timestamp = hir
2865                        .into_literal_mz_timestamp()
2866                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2867                    refresh_schedule.ats.push(timestamp);
2868                }
2869                RefreshOptionValue::Every(RefreshEveryOptionValue {
2870                    interval,
2871                    aligned_to,
2872                }) => {
2873                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2874                    if interval.as_microseconds() <= 0 {
2875                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2876                    }
2877                    if interval.months != 0 {
2878                        // This limitation is because we want Intervals to be cleanly convertable
2879                        // to a unix epoch timestamp difference. When the interval involves months, then
2880                        // this is not true anymore, because months have variable lengths.
2881                        // See `Timestamp::round_up`.
2882                        sql_bail!("REFRESH interval must not involve units larger than days");
2883                    }
2884                    let interval = interval.duration()?;
2885                    if u64::try_from(interval.as_millis()).is_err() {
2886                        sql_bail!("REFRESH interval too large");
2887                    }
2888
2889                    let mut aligned_to = match aligned_to {
2890                        Some(aligned_to) => aligned_to,
2891                        None => {
2892                            soft_panic_or_log!(
2893                                "ALIGNED TO should have been filled in by purification"
2894                            );
2895                            sql_bail!(
2896                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2897                            )
2898                        }
2899                    };
2900
2901                    // Desugar the `aligned_to` expression
2902                    transform_ast::transform(scx, &mut aligned_to)?;
2903
2904                    let ecx = &ExprContext {
2905                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2906                        name: "REFRESH EVERY ... ALIGNED TO",
2907                        scope: &Scope::empty(),
2908                        relation_type: &SqlRelationType::empty(),
2909                        allow_aggregates: false,
2910                        allow_subqueries: false,
2911                        allow_parameters: false,
2912                        allow_windows: false,
2913                    };
2914                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2915                        ecx,
2916                        CastContext::Assignment,
2917                        &SqlScalarType::MzTimestamp,
2918                    )?;
2919                    // (mz_now was purified away to a literal earlier)
2920                    let aligned_to_const = aligned_to_hir
2921                        .into_literal_mz_timestamp()
2922                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2923
2924                    refresh_schedule.everies.push(RefreshEvery {
2925                        interval,
2926                        aligned_to: aligned_to_const,
2927                    });
2928                }
2929            }
2930        }
2931
2932        if on_commits_seen > 1 {
2933            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2934        }
2935        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2936            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2937        }
2938
2939        if refresh_schedule == RefreshSchedule::default() {
2940            None
2941        } else {
2942            Some(refresh_schedule)
2943        }
2944    };
2945
2946    let as_of = stmt.as_of.map(Timestamp::from);
2947    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2948    let mut non_null_assertions = assert_not_null
2949        .into_iter()
2950        .map(normalize::column_name)
2951        .map(|assertion_name| {
2952            column_names
2953                .iter()
2954                .position(|col| col == &assertion_name)
2955                .ok_or_else(|| {
2956                    sql_err!(
2957                        "column {} in ASSERT NOT NULL option not found",
2958                        assertion_name.quoted()
2959                    )
2960                })
2961        })
2962        .collect::<Result<Vec<_>, _>>()?;
2963    non_null_assertions.sort();
2964    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2965        let dup = &column_names[*dup];
2966        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2967    }
2968
2969    if let Some(dup) = column_names.iter().duplicates().next() {
2970        sql_bail!("column {} specified more than once", dup.quoted());
2971    }
2972
2973    // Override the statement-level IfExistsBehavior with Skip if this is
2974    // explicitly requested in the PlanContext (the default is `false`).
2975    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2976        Ok(true) => IfExistsBehavior::Skip,
2977        _ => stmt.if_exists,
2978    };
2979
2980    let mut replace = None;
2981    let mut if_not_exists = false;
2982    match if_exists {
2983        IfExistsBehavior::Replace => {
2984            let if_exists = true;
2985            let cascade = false;
2986            let replace_id = plan_drop_item(
2987                scx,
2988                ObjectType::MaterializedView,
2989                if_exists,
2990                partial_name.clone().into(),
2991                cascade,
2992            )?;
2993
2994            // Check if the new Materialized View depends on the item that we would be replacing.
2995            if let Some(id) = replace_id {
2996                let dependencies = expr.depends_on();
2997                let invalid_drop = scx
2998                    .get_item(&id)
2999                    .global_ids()
3000                    .any(|gid| dependencies.contains(&gid));
3001                if invalid_drop {
3002                    let item = scx.catalog.get_item(&id);
3003                    sql_bail!(
3004                        "cannot replace materialized view {0}: depended upon by new {0} definition",
3005                        scx.catalog.resolve_full_name(item.name())
3006                    );
3007                }
3008                replace = Some(id);
3009            }
3010        }
3011        IfExistsBehavior::Skip => if_not_exists = true,
3012        IfExistsBehavior::Error => (),
3013    }
3014    let drop_ids = replace
3015        .map(|id| {
3016            scx.catalog
3017                .item_dependents(id)
3018                .into_iter()
3019                .map(|id| id.unwrap_item_id())
3020                .collect()
3021        })
3022        .unwrap_or_default();
3023    let mut dependencies: BTreeSet<_> = expr
3024        .depends_on()
3025        .into_iter()
3026        .map(|gid| scx.catalog.resolve_item_id(&gid))
3027        .collect();
3028
3029    // Validate the replacement target, if one is given.
3030    let mut replacement_target = None;
3031    if let Some(target_name) = &stmt.replacement_for {
3032        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3033
3034        let target = scx.get_item_by_resolved_name(target_name)?;
3035        if target.item_type() != CatalogItemType::MaterializedView {
3036            return Err(PlanError::InvalidReplacement {
3037                item_type: target.item_type(),
3038                item_name: scx.catalog.minimal_qualification(target.name()),
3039                replacement_type: CatalogItemType::MaterializedView,
3040                replacement_name: partial_name,
3041            });
3042        }
3043        if target.id().is_system() {
3044            sql_bail!(
3045                "cannot replace {} because it is required by the database system",
3046                scx.catalog.minimal_qualification(target.name()),
3047            );
3048        }
3049
3050        // Check for dependency cycles.
3051        for dependent in scx.catalog.item_dependents(target.id()) {
3052            if let ObjectId::Item(id) = dependent
3053                && dependencies.contains(&id)
3054            {
3055                sql_bail!(
3056                    "replacement would cause {} to depend on itself",
3057                    scx.catalog.minimal_qualification(target.name()),
3058                );
3059            }
3060        }
3061
3062        dependencies.insert(target.id());
3063
3064        for use_id in target.used_by() {
3065            let use_item = scx.get_item(use_id);
3066            if use_item.replacement_target() == Some(target.id()) {
3067                sql_bail!(
3068                    "cannot replace {} because it already has a replacement: {}",
3069                    scx.catalog.minimal_qualification(target.name()),
3070                    scx.catalog.minimal_qualification(use_item.name()),
3071                );
3072            }
3073        }
3074
3075        replacement_target = Some(target.id());
3076    }
3077
3078    validate_view_dependencies(scx, &dependencies)?;
3079
3080    // Check for an object in the catalog with this same name
3081    let full_name = scx.catalog.resolve_full_name(&name);
3082    let partial_name = PartialItemName::from(full_name.clone());
3083    // For PostgreSQL compatibility, we need to prevent creating materialized
3084    // views when there is an existing object *or* type of the same name.
3085    if let (IfExistsBehavior::Error, Ok(item)) =
3086        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3087    {
3088        return Err(PlanError::ItemAlreadyExists {
3089            name: full_name.to_string(),
3090            item_type: item.item_type(),
3091        });
3092    }
3093
3094    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3095        name,
3096        materialized_view: MaterializedView {
3097            create_sql,
3098            expr,
3099            dependencies: DependencyIds(dependencies),
3100            column_names,
3101            replacement_target,
3102            cluster_id,
3103            target_replica,
3104            non_null_assertions,
3105            compaction_window,
3106            refresh_schedule,
3107            as_of,
3108        },
3109        replace,
3110        drop_ids,
3111        if_not_exists,
3112        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3113    }))
3114}
3115
3116generate_extracted_config!(
3117    MaterializedViewOption,
3118    (AssertNotNull, Ident, AllowMultiple),
3119    (PartitionBy, Vec<Ident>),
3120    (RetainHistory, OptionalDuration),
3121    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3122);
3123
3124pub fn plan_create_continual_task(
3125    scx: &StatementContext,
3126    mut stmt: CreateContinualTaskStatement<Aug>,
3127) -> Result<Plan, PlanError> {
3128    match &stmt.sugar {
3129        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3130        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3131            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3132        }
3133        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3134            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3135        }
3136    };
3137    let cluster_id = match &stmt.in_cluster {
3138        None => scx.catalog.resolve_cluster(None)?.id(),
3139        Some(in_cluster) => in_cluster.id,
3140    };
3141    stmt.in_cluster = Some(ResolvedClusterName {
3142        id: cluster_id,
3143        print_name: None,
3144    });
3145
3146    let create_sql =
3147        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3148
3149    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3150
3151    // It seems desirable for a CT that e.g. simply filters the input to keep
3152    // the same nullability. So, start by assuming all columns are non-nullable,
3153    // and then make them nullable below if any of the exprs plan them as
3154    // nullable.
3155    let mut desc = match stmt.columns {
3156        None => None,
3157        Some(columns) => {
3158            let mut desc_columns = Vec::with_capacity(columns.capacity());
3159            for col in columns.iter() {
3160                desc_columns.push((
3161                    normalize::column_name(col.name.clone()),
3162                    SqlColumnType {
3163                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3164                        nullable: false,
3165                    },
3166                ));
3167            }
3168            Some(RelationDesc::from_names_and_types(desc_columns))
3169        }
3170    };
3171    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3172    match input.item_type() {
3173        // Input must be a thing directly backed by a persist shard, so we can
3174        // use a persist listen to efficiently rehydrate.
3175        CatalogItemType::ContinualTask
3176        | CatalogItemType::Table
3177        | CatalogItemType::MaterializedView
3178        | CatalogItemType::Source => {}
3179        CatalogItemType::Sink
3180        | CatalogItemType::View
3181        | CatalogItemType::Index
3182        | CatalogItemType::Type
3183        | CatalogItemType::Func
3184        | CatalogItemType::Secret
3185        | CatalogItemType::Connection => {
3186            sql_bail!(
3187                "CONTINUAL TASK cannot use {} as an input",
3188                input.item_type()
3189            );
3190        }
3191    }
3192
3193    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3194    let ct_name = stmt.name;
3195    let placeholder_id = match &ct_name {
3196        ResolvedItemName::ContinualTask { id, name } => {
3197            let desc = match desc.as_ref().cloned() {
3198                Some(x) => x,
3199                None => {
3200                    // The user didn't specify the CT's columns. Take a wild
3201                    // guess that the CT has the same shape as the input. It's
3202                    // fine if this is wrong, we'll get an error below after
3203                    // planning the query.
3204                    let desc = input.relation_desc().expect("item type checked above");
3205                    desc.into_owned()
3206                }
3207            };
3208            qcx.ctes.insert(
3209                *id,
3210                CteDesc {
3211                    name: name.item.clone(),
3212                    desc,
3213                },
3214            );
3215            Some(*id)
3216        }
3217        _ => None,
3218    };
3219
3220    let mut exprs = Vec::new();
3221    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3222        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3223        let query::PlannedRootQuery {
3224            expr,
3225            desc: desc_query,
3226            finishing,
3227            scope: _,
3228        } = query::plan_ct_query(&mut qcx, query)?;
3229        // We get back a trivial finishing because we plan with a "maintained"
3230        // QueryLifetime, see comment in `plan_view`.
3231        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3232            &finishing,
3233            expr.arity()
3234        ));
3235        if expr.contains_parameters()? {
3236            if expr.contains_parameters()? {
3237                return Err(PlanError::ParameterNotAllowed(
3238                    "continual tasks".to_string(),
3239                ));
3240            }
3241        }
3242        let expr = match desc.as_mut() {
3243            None => {
3244                desc = Some(desc_query);
3245                expr
3246            }
3247            Some(desc) => {
3248                // We specify the columns for DELETE, so if any columns types don't
3249                // match, it's because it's an INSERT.
3250                if desc_query.arity() > desc.arity() {
3251                    sql_bail!(
3252                        "statement {}: INSERT has more expressions than target columns",
3253                        idx
3254                    );
3255                }
3256                if desc_query.arity() < desc.arity() {
3257                    sql_bail!(
3258                        "statement {}: INSERT has more target columns than expressions",
3259                        idx
3260                    );
3261                }
3262                // Ensure the types of the source query match the types of the target table,
3263                // installing assignment casts where necessary and possible.
3264                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3265                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3266                let expr = expr.map_err(|e| {
3267                    sql_err!(
3268                        "statement {}: column {} is of type {} but expression is of type {}",
3269                        idx,
3270                        desc.get_name(e.column).quoted(),
3271                        qcx.humanize_scalar_type(&e.target_type, false),
3272                        qcx.humanize_scalar_type(&e.source_type, false),
3273                    )
3274                })?;
3275
3276                // Update ct nullability as necessary. The `ne` above verified that the
3277                // types are the same len.
3278                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3279                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3280                if updated {
3281                    let new_types = zip_types().map(|(ct, q)| {
3282                        let mut ct = ct.clone();
3283                        if q.nullable {
3284                            ct.nullable = true;
3285                        }
3286                        ct
3287                    });
3288                    *desc = RelationDesc::from_names_and_types(
3289                        desc.iter_names().cloned().zip_eq(new_types),
3290                    );
3291                }
3292
3293                expr
3294            }
3295        };
3296        match stmt {
3297            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3298            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3299        }
3300    }
3301    // TODO(ct3): Collect things by output and assert that there is only one (or
3302    // support multiple outputs).
3303    let expr = exprs
3304        .into_iter()
3305        .reduce(|acc, expr| acc.union(expr))
3306        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3307    let dependencies = expr
3308        .depends_on()
3309        .into_iter()
3310        .map(|gid| scx.catalog.resolve_item_id(&gid))
3311        .collect();
3312
3313    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3314    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3315    if let Some(dup) = column_names.iter().duplicates().next() {
3316        sql_bail!("column {} specified more than once", dup.quoted());
3317    }
3318
3319    // Check for an object in the catalog with this same name
3320    let name = match &ct_name {
3321        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3322        ResolvedItemName::ContinualTask { name, .. } => {
3323            let name = scx.allocate_qualified_name(name.clone())?;
3324            let full_name = scx.catalog.resolve_full_name(&name);
3325            let partial_name = PartialItemName::from(full_name.clone());
3326            // For PostgreSQL compatibility, we need to prevent creating this when there
3327            // is an existing object *or* type of the same name.
3328            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3329                return Err(PlanError::ItemAlreadyExists {
3330                    name: full_name.to_string(),
3331                    item_type: item.item_type(),
3332                });
3333            }
3334            name
3335        }
3336        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3337        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3338    };
3339
3340    let as_of = stmt.as_of.map(Timestamp::from);
3341    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3342        name,
3343        placeholder_id,
3344        desc,
3345        input_id: input.global_id(),
3346        with_snapshot: snapshot.unwrap_or(true),
3347        continual_task: MaterializedView {
3348            create_sql,
3349            expr,
3350            dependencies,
3351            column_names,
3352            replacement_target: None,
3353            cluster_id,
3354            target_replica: None,
3355            non_null_assertions: Vec::new(),
3356            compaction_window: None,
3357            refresh_schedule: None,
3358            as_of,
3359        },
3360    }))
3361}
3362
3363fn continual_task_query<'a>(
3364    ct_name: &ResolvedItemName,
3365    stmt: &'a ast::ContinualTaskStmt<Aug>,
3366) -> Option<ast::Query<Aug>> {
3367    match stmt {
3368        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3369            table_name: _,
3370            columns,
3371            source,
3372            returning,
3373        }) => {
3374            if !columns.is_empty() || !returning.is_empty() {
3375                return None;
3376            }
3377            match source {
3378                ast::InsertSource::Query(query) => Some(query.clone()),
3379                ast::InsertSource::DefaultValues => None,
3380            }
3381        }
3382        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3383            table_name: _,
3384            alias,
3385            using,
3386            selection,
3387        }) => {
3388            if !using.is_empty() {
3389                return None;
3390            }
3391            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3392            let from = ast::TableWithJoins {
3393                relation: ast::TableFactor::Table {
3394                    name: ct_name.clone(),
3395                    alias: alias.clone(),
3396                },
3397                joins: Vec::new(),
3398            };
3399            let select = ast::Select {
3400                from: vec![from],
3401                selection: selection.clone(),
3402                distinct: None,
3403                projection: vec![ast::SelectItem::Wildcard],
3404                group_by: Vec::new(),
3405                having: None,
3406                qualify: None,
3407                options: Vec::new(),
3408            };
3409            let query = ast::Query {
3410                ctes: ast::CteBlock::Simple(Vec::new()),
3411                body: ast::SetExpr::Select(Box::new(select)),
3412                order_by: Vec::new(),
3413                limit: None,
3414                offset: None,
3415            };
3416            // Then negate it to turn it into retractions (after planning it).
3417            Some(query)
3418        }
3419    }
3420}
3421
3422generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3423
3424pub fn describe_create_sink(
3425    _: &StatementContext,
3426    _: CreateSinkStatement<Aug>,
3427) -> Result<StatementDesc, PlanError> {
3428    Ok(StatementDesc::new(None))
3429}
3430
3431generate_extracted_config!(
3432    CreateSinkOption,
3433    (Snapshot, bool),
3434    (PartitionStrategy, String),
3435    (Version, u64),
3436    (CommitInterval, Duration)
3437);
3438
3439pub fn plan_create_sink(
3440    scx: &StatementContext,
3441    stmt: CreateSinkStatement<Aug>,
3442) -> Result<Plan, PlanError> {
3443    // Check for an object in the catalog with this same name
3444    let Some(name) = stmt.name.clone() else {
3445        return Err(PlanError::MissingName(CatalogItemType::Sink));
3446    };
3447    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3448    let full_name = scx.catalog.resolve_full_name(&name);
3449    let partial_name = PartialItemName::from(full_name.clone());
3450    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3451        return Err(PlanError::ItemAlreadyExists {
3452            name: full_name.to_string(),
3453            item_type: item.item_type(),
3454        });
3455    }
3456
3457    plan_sink(scx, stmt)
3458}
3459
3460/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3461/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3462/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3463/// important.
3464fn plan_sink(
3465    scx: &StatementContext,
3466    mut stmt: CreateSinkStatement<Aug>,
3467) -> Result<Plan, PlanError> {
3468    let CreateSinkStatement {
3469        name,
3470        in_cluster: _,
3471        from,
3472        connection,
3473        format,
3474        envelope,
3475        mode,
3476        if_not_exists,
3477        with_options,
3478    } = stmt.clone();
3479
3480    let Some(name) = name else {
3481        return Err(PlanError::MissingName(CatalogItemType::Sink));
3482    };
3483    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3484
3485    let envelope = match (&connection, envelope, mode) {
3486        // Kafka sinks use ENVELOPE
3487        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3488            SinkEnvelope::Upsert
3489        }
3490        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3491            SinkEnvelope::Debezium
3492        }
3493        (CreateSinkConnection::Kafka { .. }, None, None) => {
3494            sql_bail!("ENVELOPE clause is required")
3495        }
3496        (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3497            sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3498        }
3499        // Iceberg sinks use MODE
3500        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3501            SinkEnvelope::Upsert
3502        }
3503        (CreateSinkConnection::Iceberg { .. }, None, None) => {
3504            sql_bail!("MODE clause is required")
3505        }
3506        (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3507            sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3508        }
3509    };
3510
3511    let from_name = &from;
3512    let from = scx.get_item_by_resolved_name(&from)?;
3513
3514    {
3515        use CatalogItemType::*;
3516        match from.item_type() {
3517            Table | Source | MaterializedView | ContinualTask => {
3518                if from.replacement_target().is_some() {
3519                    let name = scx.catalog.minimal_qualification(from.name());
3520                    return Err(PlanError::InvalidSinkFrom {
3521                        name: name.to_string(),
3522                        item_type: format!("replacement {}", from.item_type()),
3523                    });
3524                }
3525            }
3526            Sink | View | Index | Type | Func | Secret | Connection => {
3527                let name = scx.catalog.minimal_qualification(from.name());
3528                return Err(PlanError::InvalidSinkFrom {
3529                    name: name.to_string(),
3530                    item_type: from.item_type().to_string(),
3531                });
3532            }
3533        }
3534    }
3535
3536    if from.id().is_system() {
3537        bail_unsupported!("creating a sink directly on a catalog object");
3538    }
3539
3540    let desc = from.relation_desc().expect("item type checked above");
3541    let key_indices = match &connection {
3542        CreateSinkConnection::Kafka { key: Some(key), .. }
3543        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3544            let key_columns = key
3545                .key_columns
3546                .clone()
3547                .into_iter()
3548                .map(normalize::column_name)
3549                .collect::<Vec<_>>();
3550            let mut uniq = BTreeSet::new();
3551            for col in key_columns.iter() {
3552                if !uniq.insert(col) {
3553                    sql_bail!("duplicate column referenced in KEY: {}", col);
3554                }
3555            }
3556            let indices = key_columns
3557                .iter()
3558                .map(|col| -> anyhow::Result<usize> {
3559                    let name_idx =
3560                        desc.get_by_name(col)
3561                            .map(|(idx, _type)| idx)
3562                            .ok_or_else(|| {
3563                                sql_err!("column referenced in KEY does not exist: {}", col)
3564                            })?;
3565                    if desc.get_unambiguous_name(name_idx).is_none() {
3566                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3567                    }
3568                    Ok(name_idx)
3569                })
3570                .collect::<Result<Vec<_>, _>>()?;
3571            let is_valid_key = desc
3572                .typ()
3573                .keys
3574                .iter()
3575                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3576
3577            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3578                if key.not_enforced {
3579                    scx.catalog
3580                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3581                            key: key_columns.clone(),
3582                            name: name.item.clone(),
3583                        })
3584                } else {
3585                    return Err(PlanError::UpsertSinkWithInvalidKey {
3586                        name: from_name.full_name_str(),
3587                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3588                        valid_keys: desc
3589                            .typ()
3590                            .keys
3591                            .iter()
3592                            .map(|key| {
3593                                key.iter()
3594                                    .map(|col| desc.get_name(*col).as_str().into())
3595                                    .collect()
3596                            })
3597                            .collect(),
3598                    });
3599                }
3600            }
3601            Some(indices)
3602        }
3603        CreateSinkConnection::Kafka { key: None, .. }
3604        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3605    };
3606
3607    let headers_index = match &connection {
3608        CreateSinkConnection::Kafka {
3609            headers: Some(headers),
3610            ..
3611        } => {
3612            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3613
3614            match envelope {
3615                SinkEnvelope::Upsert => (),
3616                SinkEnvelope::Debezium => {
3617                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3618                }
3619            };
3620
3621            let headers = normalize::column_name(headers.clone());
3622            let (idx, ty) = desc
3623                .get_by_name(&headers)
3624                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3625
3626            if desc.get_unambiguous_name(idx).is_none() {
3627                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3628            }
3629
3630            match &ty.scalar_type {
3631                SqlScalarType::Map { value_type, .. }
3632                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3633                _ => sql_bail!(
3634                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3635                ),
3636            }
3637
3638            Some(idx)
3639        }
3640        _ => None,
3641    };
3642
3643    // pick the first valid natural relation key, if any
3644    let relation_key_indices = desc.typ().keys.get(0).cloned();
3645
3646    let key_desc_and_indices = key_indices.map(|key_indices| {
3647        let cols = desc
3648            .iter()
3649            .map(|(name, ty)| (name.clone(), ty.clone()))
3650            .collect::<Vec<_>>();
3651        let (names, types): (Vec<_>, Vec<_>) =
3652            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3653        let typ = SqlRelationType::new(types);
3654        (RelationDesc::new(typ, names), key_indices)
3655    });
3656
3657    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3658        return Err(PlanError::UpsertSinkWithoutKey);
3659    }
3660
3661    let CreateSinkOptionExtracted {
3662        snapshot,
3663        version,
3664        partition_strategy: _,
3665        seen: _,
3666        commit_interval,
3667    } = with_options.try_into()?;
3668
3669    let connection_builder = match connection {
3670        CreateSinkConnection::Kafka {
3671            connection,
3672            options,
3673            ..
3674        } => kafka_sink_builder(
3675            scx,
3676            connection,
3677            options,
3678            format,
3679            relation_key_indices,
3680            key_desc_and_indices,
3681            headers_index,
3682            desc.into_owned(),
3683            envelope,
3684            from.id(),
3685            commit_interval,
3686        )?,
3687        CreateSinkConnection::Iceberg {
3688            connection,
3689            aws_connection,
3690            options,
3691            ..
3692        } => iceberg_sink_builder(
3693            scx,
3694            connection,
3695            aws_connection,
3696            options,
3697            relation_key_indices,
3698            key_desc_and_indices,
3699            commit_interval,
3700        )?,
3701    };
3702
3703    // WITH SNAPSHOT defaults to true
3704    let with_snapshot = snapshot.unwrap_or(true);
3705    // VERSION defaults to 0
3706    let version = version.unwrap_or(0);
3707
3708    // We will rewrite the cluster if one is not provided, so we must use the
3709    // `in_cluster` value we plan to normalize when we canonicalize the create
3710    // statement.
3711    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3712    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3713
3714    Ok(Plan::CreateSink(CreateSinkPlan {
3715        name,
3716        sink: Sink {
3717            create_sql,
3718            from: from.global_id(),
3719            connection: connection_builder,
3720            envelope,
3721            version,
3722            commit_interval,
3723        },
3724        with_snapshot,
3725        if_not_exists,
3726        in_cluster: in_cluster.id(),
3727    }))
3728}
3729
3730fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3731    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3732
3733    let existing_keys = desc
3734        .typ()
3735        .keys
3736        .iter()
3737        .map(|key_columns| {
3738            key_columns
3739                .iter()
3740                .map(|col| desc.get_name(*col).as_str())
3741                .join(", ")
3742        })
3743        .join(", ");
3744
3745    sql_err!(
3746        "Key constraint ({}) conflicts with existing key ({})",
3747        user_keys,
3748        existing_keys
3749    )
3750}
3751
3752/// Creating this by hand instead of using generate_extracted_config! macro
3753/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3754#[derive(Debug, Default, PartialEq, Clone)]
3755pub struct CsrConfigOptionExtracted {
3756    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3757    pub(crate) avro_key_fullname: Option<String>,
3758    pub(crate) avro_value_fullname: Option<String>,
3759    pub(crate) null_defaults: bool,
3760    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3761    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3762    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3763    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3764}
3765
3766impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3767    type Error = crate::plan::PlanError;
3768    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3769        let mut extracted = CsrConfigOptionExtracted::default();
3770        let mut common_doc_comments = BTreeMap::new();
3771        for option in v {
3772            if !extracted.seen.insert(option.name.clone()) {
3773                return Err(PlanError::Unstructured({
3774                    format!("{} specified more than once", option.name)
3775                }));
3776            }
3777            let option_name = option.name.clone();
3778            let option_name_str = option_name.to_ast_string_simple();
3779            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3780                option_name: option_name.to_ast_string_simple(),
3781                err: e.into(),
3782            };
3783            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3784                val.map(|s| match s {
3785                    WithOptionValue::Value(Value::String(s)) => {
3786                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3787                    }
3788                    _ => Err("must be a string".to_string()),
3789                })
3790                .transpose()
3791                .map_err(PlanError::Unstructured)
3792                .map_err(better_error)
3793            };
3794            match option.name {
3795                CsrConfigOptionName::AvroKeyFullname => {
3796                    extracted.avro_key_fullname =
3797                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3798                }
3799                CsrConfigOptionName::AvroValueFullname => {
3800                    extracted.avro_value_fullname =
3801                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3802                }
3803                CsrConfigOptionName::NullDefaults => {
3804                    extracted.null_defaults =
3805                        <bool>::try_from_value(option.value).map_err(better_error)?;
3806                }
3807                CsrConfigOptionName::AvroDocOn(doc_on) => {
3808                    let value = String::try_from_value(option.value.ok_or_else(|| {
3809                        PlanError::InvalidOptionValue {
3810                            option_name: option_name_str,
3811                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3812                        }
3813                    })?)
3814                    .map_err(better_error)?;
3815                    let key = match doc_on.identifier {
3816                        DocOnIdentifier::Column(ast::ColumnName {
3817                            relation: ResolvedItemName::Item { id, .. },
3818                            column: ResolvedColumnReference::Column { name, index: _ },
3819                        }) => DocTarget::Field {
3820                            object_id: id,
3821                            column_name: name,
3822                        },
3823                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3824                            DocTarget::Type(id)
3825                        }
3826                        _ => unreachable!(),
3827                    };
3828
3829                    match doc_on.for_schema {
3830                        DocOnSchema::KeyOnly => {
3831                            extracted.key_doc_options.insert(key, value);
3832                        }
3833                        DocOnSchema::ValueOnly => {
3834                            extracted.value_doc_options.insert(key, value);
3835                        }
3836                        DocOnSchema::All => {
3837                            common_doc_comments.insert(key, value);
3838                        }
3839                    }
3840                }
3841                CsrConfigOptionName::KeyCompatibilityLevel => {
3842                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3843                }
3844                CsrConfigOptionName::ValueCompatibilityLevel => {
3845                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3846                }
3847            }
3848        }
3849
3850        for (key, value) in common_doc_comments {
3851            if !extracted.key_doc_options.contains_key(&key) {
3852                extracted.key_doc_options.insert(key.clone(), value.clone());
3853            }
3854            if !extracted.value_doc_options.contains_key(&key) {
3855                extracted.value_doc_options.insert(key, value);
3856            }
3857        }
3858        Ok(extracted)
3859    }
3860}
3861
3862fn iceberg_sink_builder(
3863    scx: &StatementContext,
3864    catalog_connection: ResolvedItemName,
3865    aws_connection: ResolvedItemName,
3866    options: Vec<IcebergSinkConfigOption<Aug>>,
3867    relation_key_indices: Option<Vec<usize>>,
3868    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3869    commit_interval: Option<Duration>,
3870) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3871    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3872    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3873    let catalog_connection_id = catalog_connection_item.id();
3874    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3875    let aws_connection_id = aws_connection_item.id();
3876    if !matches!(
3877        catalog_connection_item.connection()?,
3878        Connection::IcebergCatalog(_)
3879    ) {
3880        sql_bail!(
3881            "{} is not an iceberg catalog connection",
3882            scx.catalog
3883                .resolve_full_name(catalog_connection_item.name())
3884                .to_string()
3885                .quoted()
3886        );
3887    };
3888
3889    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3890        sql_bail!(
3891            "{} is not an AWS connection",
3892            scx.catalog
3893                .resolve_full_name(aws_connection_item.name())
3894                .to_string()
3895                .quoted()
3896        );
3897    }
3898
3899    let IcebergSinkConfigOptionExtracted {
3900        table,
3901        namespace,
3902        seen: _,
3903    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3904
3905    let Some(table) = table else {
3906        sql_bail!("Iceberg sink must specify TABLE");
3907    };
3908    let Some(namespace) = namespace else {
3909        sql_bail!("Iceberg sink must specify NAMESPACE");
3910    };
3911    if commit_interval.is_none() {
3912        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3913    }
3914
3915    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3916        catalog_connection_id,
3917        catalog_connection: catalog_connection_id,
3918        aws_connection_id,
3919        aws_connection: aws_connection_id,
3920        table,
3921        namespace,
3922        relation_key_indices,
3923        key_desc_and_indices,
3924    }))
3925}
3926
3927fn kafka_sink_builder(
3928    scx: &StatementContext,
3929    connection: ResolvedItemName,
3930    options: Vec<KafkaSinkConfigOption<Aug>>,
3931    format: Option<FormatSpecifier<Aug>>,
3932    relation_key_indices: Option<Vec<usize>>,
3933    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3934    headers_index: Option<usize>,
3935    value_desc: RelationDesc,
3936    envelope: SinkEnvelope,
3937    sink_from: CatalogItemId,
3938    commit_interval: Option<Duration>,
3939) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3940    // Get Kafka connection.
3941    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3942    let connection_id = connection_item.id();
3943    match connection_item.connection()? {
3944        Connection::Kafka(_) => (),
3945        _ => sql_bail!(
3946            "{} is not a kafka connection",
3947            scx.catalog.resolve_full_name(connection_item.name())
3948        ),
3949    };
3950
3951    if commit_interval.is_some() {
3952        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3953    }
3954
3955    let KafkaSinkConfigOptionExtracted {
3956        topic,
3957        compression_type,
3958        partition_by,
3959        progress_group_id_prefix,
3960        transactional_id_prefix,
3961        legacy_ids,
3962        topic_config,
3963        topic_metadata_refresh_interval,
3964        topic_partition_count,
3965        topic_replication_factor,
3966        seen: _,
3967    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3968
3969    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3970        (Some(_), Some(true)) => {
3971            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3972        }
3973        (None, Some(true)) => KafkaIdStyle::Legacy,
3974        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3975    };
3976
3977    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3978        (Some(_), Some(true)) => {
3979            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3980        }
3981        (None, Some(true)) => KafkaIdStyle::Legacy,
3982        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3983    };
3984
3985    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3986
3987    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3988        // This is a librdkafka-enforced restriction that, if violated,
3989        // would result in a runtime error for the source.
3990        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3991    }
3992
3993    let assert_positive = |val: Option<i32>, name: &str| {
3994        if let Some(val) = val {
3995            if val <= 0 {
3996                sql_bail!("{} must be a positive integer", name);
3997            }
3998        }
3999        val.map(NonNeg::try_from)
4000            .transpose()
4001            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
4002    };
4003    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
4004    let topic_replication_factor =
4005        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
4006
4007    // Helper method to parse avro connection options for format specifiers that use avro
4008    // for either key or value encoding.
4009    let gen_avro_schema_options = |conn| {
4010        let CsrConnectionAvro {
4011            connection:
4012                CsrConnection {
4013                    connection,
4014                    options,
4015                },
4016            seed,
4017            key_strategy,
4018            value_strategy,
4019        } = conn;
4020        if seed.is_some() {
4021            sql_bail!("SEED option does not make sense with sinks");
4022        }
4023        if key_strategy.is_some() {
4024            sql_bail!("KEY STRATEGY option does not make sense with sinks");
4025        }
4026        if value_strategy.is_some() {
4027            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
4028        }
4029
4030        let item = scx.get_item_by_resolved_name(&connection)?;
4031        let csr_connection = match item.connection()? {
4032            Connection::Csr(_) => item.id(),
4033            _ => {
4034                sql_bail!(
4035                    "{} is not a schema registry connection",
4036                    scx.catalog
4037                        .resolve_full_name(item.name())
4038                        .to_string()
4039                        .quoted()
4040                )
4041            }
4042        };
4043        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4044
4045        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4046            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4047        }
4048
4049        if key_desc_and_indices.is_some()
4050            && (extracted_options.avro_key_fullname.is_some()
4051                ^ extracted_options.avro_value_fullname.is_some())
4052        {
4053            sql_bail!(
4054                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4055            );
4056        }
4057
4058        Ok((csr_connection, extracted_options))
4059    };
4060
4061    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4062        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4063        Format::Bytes if desc.arity() == 1 => {
4064            let col_type = &desc.typ().column_types[0].scalar_type;
4065            if !mz_pgrepr::Value::can_encode_binary(col_type) {
4066                bail_unsupported!(format!(
4067                    "BYTES format with non-encodable type: {:?}",
4068                    col_type
4069                ));
4070            }
4071
4072            Ok(KafkaSinkFormatType::Bytes)
4073        }
4074        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4075        Format::Bytes | Format::Text => {
4076            bail_unsupported!("BYTES or TEXT format with multiple columns")
4077        }
4078        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4079        Format::Avro(AvroSchema::Csr { csr_connection }) => {
4080            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4081            let schema = if is_key {
4082                AvroSchemaGenerator::new(
4083                    desc.clone(),
4084                    false,
4085                    options.key_doc_options,
4086                    options.avro_key_fullname.as_deref().unwrap_or("row"),
4087                    options.null_defaults,
4088                    Some(sink_from),
4089                    false,
4090                )?
4091                .schema()
4092                .to_string()
4093            } else {
4094                AvroSchemaGenerator::new(
4095                    desc.clone(),
4096                    matches!(envelope, SinkEnvelope::Debezium),
4097                    options.value_doc_options,
4098                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4099                    options.null_defaults,
4100                    Some(sink_from),
4101                    true,
4102                )?
4103                .schema()
4104                .to_string()
4105            };
4106            Ok(KafkaSinkFormatType::Avro {
4107                schema,
4108                compatibility_level: if is_key {
4109                    options.key_compatibility_level
4110                } else {
4111                    options.value_compatibility_level
4112                },
4113                csr_connection,
4114            })
4115        }
4116        format => bail_unsupported!(format!("sink format {:?}", format)),
4117    };
4118
4119    let partition_by = match &partition_by {
4120        Some(partition_by) => {
4121            let mut scope = Scope::from_source(None, value_desc.iter_names());
4122
4123            match envelope {
4124                SinkEnvelope::Upsert => (),
4125                SinkEnvelope::Debezium => {
4126                    let key_indices: HashSet<_> = key_desc_and_indices
4127                        .as_ref()
4128                        .map(|(_desc, indices)| indices.as_slice())
4129                        .unwrap_or_default()
4130                        .into_iter()
4131                        .collect();
4132                    for (i, item) in scope.items.iter_mut().enumerate() {
4133                        if !key_indices.contains(&i) {
4134                            item.error_if_referenced = Some(|_table, column| {
4135                                PlanError::InvalidPartitionByEnvelopeDebezium {
4136                                    column_name: column.to_string(),
4137                                }
4138                            });
4139                        }
4140                    }
4141                }
4142            };
4143
4144            let ecx = &ExprContext {
4145                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4146                name: "PARTITION BY",
4147                scope: &scope,
4148                relation_type: value_desc.typ(),
4149                allow_aggregates: false,
4150                allow_subqueries: false,
4151                allow_parameters: false,
4152                allow_windows: false,
4153            };
4154            let expr = plan_expr(ecx, partition_by)?.cast_to(
4155                ecx,
4156                CastContext::Assignment,
4157                &SqlScalarType::UInt64,
4158            )?;
4159            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4160
4161            Some(expr)
4162        }
4163        _ => None,
4164    };
4165
4166    // Map from the format specifier of the statement to the individual key/value formats for the sink.
4167    let format = match format {
4168        Some(FormatSpecifier::KeyValue { key, value }) => {
4169            let key_format = match key_desc_and_indices.as_ref() {
4170                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4171                None => None,
4172            };
4173            KafkaSinkFormat {
4174                value_format: map_format(value, &value_desc, false)?,
4175                key_format,
4176            }
4177        }
4178        Some(FormatSpecifier::Bare(format)) => {
4179            let key_format = match key_desc_and_indices.as_ref() {
4180                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4181                None => None,
4182            };
4183            KafkaSinkFormat {
4184                value_format: map_format(format, &value_desc, false)?,
4185                key_format,
4186            }
4187        }
4188        None => bail_unsupported!("sink without format"),
4189    };
4190
4191    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4192        connection_id,
4193        connection: connection_id,
4194        format,
4195        topic: topic_name,
4196        relation_key_indices,
4197        key_desc_and_indices,
4198        headers_index,
4199        value_desc,
4200        partition_by,
4201        compression_type,
4202        progress_group_id,
4203        transactional_id,
4204        topic_options: KafkaTopicOptions {
4205            partition_count: topic_partition_count,
4206            replication_factor: topic_replication_factor,
4207            topic_config: topic_config.unwrap_or_default(),
4208        },
4209        topic_metadata_refresh_interval,
4210    }))
4211}
4212
4213pub fn describe_create_index(
4214    _: &StatementContext,
4215    _: CreateIndexStatement<Aug>,
4216) -> Result<StatementDesc, PlanError> {
4217    Ok(StatementDesc::new(None))
4218}
4219
4220pub fn plan_create_index(
4221    scx: &StatementContext,
4222    mut stmt: CreateIndexStatement<Aug>,
4223) -> Result<Plan, PlanError> {
4224    let CreateIndexStatement {
4225        name,
4226        on_name,
4227        in_cluster,
4228        key_parts,
4229        with_options,
4230        if_not_exists,
4231    } = &mut stmt;
4232    let on = scx.get_item_by_resolved_name(on_name)?;
4233
4234    {
4235        use CatalogItemType::*;
4236        match on.item_type() {
4237            Table | Source | View | MaterializedView | ContinualTask => {
4238                if on.replacement_target().is_some() {
4239                    sql_bail!(
4240                        "index cannot be created on {} because it is a replacement {}",
4241                        on_name.full_name_str(),
4242                        on.item_type(),
4243                    );
4244                }
4245            }
4246            Sink | Index | Type | Func | Secret | Connection => {
4247                sql_bail!(
4248                    "index cannot be created on {} because it is a {}",
4249                    on_name.full_name_str(),
4250                    on.item_type(),
4251                );
4252            }
4253        }
4254    }
4255
4256    let on_desc = on.relation_desc().expect("item type checked above");
4257
4258    let filled_key_parts = match key_parts {
4259        Some(kp) => kp.to_vec(),
4260        None => {
4261            // `key_parts` is None if we're creating a "default" index.
4262            let key = on_desc.typ().default_key();
4263            key.iter()
4264                .map(|i| match on_desc.get_unambiguous_name(*i) {
4265                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4266                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4267                })
4268                .collect()
4269        }
4270    };
4271    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4272
4273    let index_name = if let Some(name) = name {
4274        QualifiedItemName {
4275            qualifiers: on.name().qualifiers.clone(),
4276            item: normalize::ident(name.clone()),
4277        }
4278    } else {
4279        let mut idx_name = QualifiedItemName {
4280            qualifiers: on.name().qualifiers.clone(),
4281            item: on.name().item.clone(),
4282        };
4283        if key_parts.is_none() {
4284            // We're trying to create the "default" index.
4285            idx_name.item += "_primary_idx";
4286        } else {
4287            // Use PG schema for automatically naming indexes:
4288            // `<table>_<_-separated indexed expressions>_idx`
4289            let index_name_col_suffix = keys
4290                .iter()
4291                .map(|k| match k {
4292                    mz_expr::MirScalarExpr::Column(i, name) => {
4293                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4294                            (Some(col_name), _) => col_name.to_string(),
4295                            (None, Some(name)) => name.to_string(),
4296                            (None, None) => format!("{}", i + 1),
4297                        }
4298                    }
4299                    _ => "expr".to_string(),
4300                })
4301                .join("_");
4302            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4303                .expect("write on strings cannot fail");
4304            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4305        }
4306
4307        if !*if_not_exists {
4308            scx.catalog.find_available_name(idx_name)
4309        } else {
4310            idx_name
4311        }
4312    };
4313
4314    // Check for an object in the catalog with this same name
4315    let full_name = scx.catalog.resolve_full_name(&index_name);
4316    let partial_name = PartialItemName::from(full_name.clone());
4317    // For PostgreSQL compatibility, we need to prevent creating indexes when
4318    // there is an existing object *or* type of the same name.
4319    //
4320    // Technically, we only need to prevent coexistence of indexes and types
4321    // that have an associated relation (record types but not list/map types).
4322    // Enforcing that would be more complicated, though. It's backwards
4323    // compatible to weaken this restriction in the future.
4324    if let (Ok(item), false, false) = (
4325        scx.catalog.resolve_item_or_type(&partial_name),
4326        *if_not_exists,
4327        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4328    ) {
4329        return Err(PlanError::ItemAlreadyExists {
4330            name: full_name.to_string(),
4331            item_type: item.item_type(),
4332        });
4333    }
4334
4335    let options = plan_index_options(scx, with_options.clone())?;
4336    let cluster_id = match in_cluster {
4337        None => scx.resolve_cluster(None)?.id(),
4338        Some(in_cluster) => in_cluster.id,
4339    };
4340
4341    *in_cluster = Some(ResolvedClusterName {
4342        id: cluster_id,
4343        print_name: None,
4344    });
4345
4346    // Normalize `stmt`.
4347    *name = Some(Ident::new(index_name.item.clone())?);
4348    *key_parts = Some(filled_key_parts);
4349    let if_not_exists = *if_not_exists;
4350
4351    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4352    let compaction_window = options.iter().find_map(|o| {
4353        #[allow(irrefutable_let_patterns)]
4354        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4355            Some(lcw.clone())
4356        } else {
4357            None
4358        }
4359    });
4360
4361    Ok(Plan::CreateIndex(CreateIndexPlan {
4362        name: index_name,
4363        index: Index {
4364            create_sql,
4365            on: on.global_id(),
4366            keys,
4367            cluster_id,
4368            compaction_window,
4369        },
4370        if_not_exists,
4371    }))
4372}
4373
4374pub fn describe_create_type(
4375    _: &StatementContext,
4376    _: CreateTypeStatement<Aug>,
4377) -> Result<StatementDesc, PlanError> {
4378    Ok(StatementDesc::new(None))
4379}
4380
4381pub fn plan_create_type(
4382    scx: &StatementContext,
4383    stmt: CreateTypeStatement<Aug>,
4384) -> Result<Plan, PlanError> {
4385    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4386    let CreateTypeStatement { name, as_type, .. } = stmt;
4387
4388    fn validate_data_type(
4389        scx: &StatementContext,
4390        data_type: ResolvedDataType,
4391        as_type: &str,
4392        key: &str,
4393    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4394        let (id, modifiers) = match data_type {
4395            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4396            _ => sql_bail!(
4397                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4398                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4399                as_type,
4400                key,
4401                data_type.human_readable_name(),
4402            ),
4403        };
4404
4405        let item = scx.catalog.get_item(&id);
4406        match item.type_details() {
4407            None => sql_bail!(
4408                "{} must be of class type, but received {} which is of class {}",
4409                key,
4410                scx.catalog.resolve_full_name(item.name()),
4411                item.item_type()
4412            ),
4413            Some(CatalogTypeDetails {
4414                typ: CatalogType::Char,
4415                ..
4416            }) => {
4417                bail_unsupported!("embedding char type in a list or map")
4418            }
4419            _ => {
4420                // Validate that the modifiers are actually valid.
4421                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4422
4423                Ok((id, modifiers))
4424            }
4425        }
4426    }
4427
4428    let inner = match as_type {
4429        CreateTypeAs::List { options } => {
4430            let CreateTypeListOptionExtracted {
4431                element_type,
4432                seen: _,
4433            } = CreateTypeListOptionExtracted::try_from(options)?;
4434            let element_type =
4435                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4436            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4437            CatalogType::List {
4438                element_reference: id,
4439                element_modifiers: modifiers,
4440            }
4441        }
4442        CreateTypeAs::Map { options } => {
4443            let CreateTypeMapOptionExtracted {
4444                key_type,
4445                value_type,
4446                seen: _,
4447            } = CreateTypeMapOptionExtracted::try_from(options)?;
4448            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4449            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4450            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4451            let (value_id, value_modifiers) =
4452                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4453            CatalogType::Map {
4454                key_reference: key_id,
4455                key_modifiers,
4456                value_reference: value_id,
4457                value_modifiers,
4458            }
4459        }
4460        CreateTypeAs::Record { column_defs } => {
4461            let mut fields = vec![];
4462            for column_def in column_defs {
4463                let data_type = column_def.data_type;
4464                let key = ident(column_def.name.clone());
4465                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4466                fields.push(CatalogRecordField {
4467                    name: ColumnName::from(key.clone()),
4468                    type_reference: id,
4469                    type_modifiers: modifiers,
4470                });
4471            }
4472            CatalogType::Record { fields }
4473        }
4474    };
4475
4476    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4477
4478    // Check for an object in the catalog with this same name
4479    let full_name = scx.catalog.resolve_full_name(&name);
4480    let partial_name = PartialItemName::from(full_name.clone());
4481    // For PostgreSQL compatibility, we need to prevent creating types when
4482    // there is an existing object *or* type of the same name.
4483    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4484        if item.item_type().conflicts_with_type() {
4485            return Err(PlanError::ItemAlreadyExists {
4486                name: full_name.to_string(),
4487                item_type: item.item_type(),
4488            });
4489        }
4490    }
4491
4492    Ok(Plan::CreateType(CreateTypePlan {
4493        name,
4494        typ: Type { create_sql, inner },
4495    }))
4496}
4497
4498generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4499
4500generate_extracted_config!(
4501    CreateTypeMapOption,
4502    (KeyType, ResolvedDataType),
4503    (ValueType, ResolvedDataType)
4504);
4505
4506#[derive(Debug)]
4507pub enum PlannedAlterRoleOption {
4508    Attributes(PlannedRoleAttributes),
4509    Variable(PlannedRoleVariable),
4510}
4511
4512#[derive(Debug, Clone)]
4513pub struct PlannedRoleAttributes {
4514    pub inherit: Option<bool>,
4515    pub password: Option<Password>,
4516    pub scram_iterations: Option<NonZeroU32>,
4517    /// `nopassword` is set to true if the password is from the parser is None.
4518    /// This is semantically different than not supplying a password at all,
4519    /// to allow for unsetting a password.
4520    pub nopassword: Option<bool>,
4521    pub superuser: Option<bool>,
4522    pub login: Option<bool>,
4523}
4524
4525fn plan_role_attributes(
4526    options: Vec<RoleAttribute>,
4527    scx: &StatementContext,
4528) -> Result<PlannedRoleAttributes, PlanError> {
4529    let mut planned_attributes = PlannedRoleAttributes {
4530        inherit: None,
4531        password: None,
4532        scram_iterations: None,
4533        superuser: None,
4534        login: None,
4535        nopassword: None,
4536    };
4537
4538    for option in options {
4539        match option {
4540            RoleAttribute::Inherit | RoleAttribute::NoInherit
4541                if planned_attributes.inherit.is_some() =>
4542            {
4543                sql_bail!("conflicting or redundant options");
4544            }
4545            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4546                bail_never_supported!(
4547                    "CREATECLUSTER attribute",
4548                    "sql/create-role/#details",
4549                    "Use system privileges instead."
4550                );
4551            }
4552            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4553                bail_never_supported!(
4554                    "CREATEDB attribute",
4555                    "sql/create-role/#details",
4556                    "Use system privileges instead."
4557                );
4558            }
4559            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4560                bail_never_supported!(
4561                    "CREATEROLE attribute",
4562                    "sql/create-role/#details",
4563                    "Use system privileges instead."
4564                );
4565            }
4566            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4567                sql_bail!("conflicting or redundant options");
4568            }
4569
4570            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4571            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4572            RoleAttribute::Password(password) => {
4573                if let Some(password) = password {
4574                    planned_attributes.password = Some(password.into());
4575                    planned_attributes.scram_iterations =
4576                        Some(scx.catalog.system_vars().scram_iterations())
4577                } else {
4578                    planned_attributes.nopassword = Some(true);
4579                }
4580            }
4581            RoleAttribute::SuperUser => {
4582                if planned_attributes.superuser == Some(false) {
4583                    sql_bail!("conflicting or redundant options");
4584                }
4585                planned_attributes.superuser = Some(true);
4586            }
4587            RoleAttribute::NoSuperUser => {
4588                if planned_attributes.superuser == Some(true) {
4589                    sql_bail!("conflicting or redundant options");
4590                }
4591                planned_attributes.superuser = Some(false);
4592            }
4593            RoleAttribute::Login => {
4594                if planned_attributes.login == Some(false) {
4595                    sql_bail!("conflicting or redundant options");
4596                }
4597                planned_attributes.login = Some(true);
4598            }
4599            RoleAttribute::NoLogin => {
4600                if planned_attributes.login == Some(true) {
4601                    sql_bail!("conflicting or redundant options");
4602                }
4603                planned_attributes.login = Some(false);
4604            }
4605        }
4606    }
4607    if planned_attributes.inherit == Some(false) {
4608        bail_unsupported!("non inherit roles");
4609    }
4610
4611    Ok(planned_attributes)
4612}
4613
4614#[derive(Debug)]
4615pub enum PlannedRoleVariable {
4616    Set { name: String, value: VariableValue },
4617    Reset { name: String },
4618}
4619
4620impl PlannedRoleVariable {
4621    pub fn name(&self) -> &str {
4622        match self {
4623            PlannedRoleVariable::Set { name, .. } => name,
4624            PlannedRoleVariable::Reset { name } => name,
4625        }
4626    }
4627}
4628
4629fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4630    let plan = match variable {
4631        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4632            name: name.to_string(),
4633            value: scl::plan_set_variable_to(value)?,
4634        },
4635        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4636            name: name.to_string(),
4637        },
4638    };
4639    Ok(plan)
4640}
4641
4642pub fn describe_create_role(
4643    _: &StatementContext,
4644    _: CreateRoleStatement,
4645) -> Result<StatementDesc, PlanError> {
4646    Ok(StatementDesc::new(None))
4647}
4648
4649pub fn plan_create_role(
4650    scx: &StatementContext,
4651    CreateRoleStatement { name, options }: CreateRoleStatement,
4652) -> Result<Plan, PlanError> {
4653    let attributes = plan_role_attributes(options, scx)?;
4654    Ok(Plan::CreateRole(CreateRolePlan {
4655        name: normalize::ident(name),
4656        attributes: attributes.into(),
4657    }))
4658}
4659
4660pub fn plan_create_network_policy(
4661    ctx: &StatementContext,
4662    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4663) -> Result<Plan, PlanError> {
4664    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4665    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4666
4667    let Some(rule_defs) = policy_options.rules else {
4668        sql_bail!("RULES must be specified when creating network policies.");
4669    };
4670
4671    let mut rules = vec![];
4672    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4673        let NetworkPolicyRuleOptionExtracted {
4674            seen: _,
4675            direction,
4676            action,
4677            address,
4678        } = options.try_into()?;
4679        let (direction, action, address) = match (direction, action, address) {
4680            (Some(direction), Some(action), Some(address)) => (
4681                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4682                NetworkPolicyRuleAction::try_from(action.as_str())?,
4683                PolicyAddress::try_from(address.as_str())?,
4684            ),
4685            (_, _, _) => {
4686                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4687            }
4688        };
4689        rules.push(NetworkPolicyRule {
4690            name: normalize::ident(name),
4691            direction,
4692            action,
4693            address,
4694        });
4695    }
4696
4697    if rules.len()
4698        > ctx
4699            .catalog
4700            .system_vars()
4701            .max_rules_per_network_policy()
4702            .try_into()?
4703    {
4704        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4705    }
4706
4707    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4708        name: normalize::ident(name),
4709        rules,
4710    }))
4711}
4712
4713pub fn plan_alter_network_policy(
4714    ctx: &StatementContext,
4715    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4716) -> Result<Plan, PlanError> {
4717    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4718
4719    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4720    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4721
4722    let Some(rule_defs) = policy_options.rules else {
4723        sql_bail!("RULES must be specified when creating network policies.");
4724    };
4725
4726    let mut rules = vec![];
4727    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4728        let NetworkPolicyRuleOptionExtracted {
4729            seen: _,
4730            direction,
4731            action,
4732            address,
4733        } = options.try_into()?;
4734
4735        let (direction, action, address) = match (direction, action, address) {
4736            (Some(direction), Some(action), Some(address)) => (
4737                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4738                NetworkPolicyRuleAction::try_from(action.as_str())?,
4739                PolicyAddress::try_from(address.as_str())?,
4740            ),
4741            (_, _, _) => {
4742                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4743            }
4744        };
4745        rules.push(NetworkPolicyRule {
4746            name: normalize::ident(name),
4747            direction,
4748            action,
4749            address,
4750        });
4751    }
4752    if rules.len()
4753        > ctx
4754            .catalog
4755            .system_vars()
4756            .max_rules_per_network_policy()
4757            .try_into()?
4758    {
4759        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4760    }
4761
4762    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4763        id: policy.id(),
4764        name: normalize::ident(name),
4765        rules,
4766    }))
4767}
4768
4769pub fn describe_create_cluster(
4770    _: &StatementContext,
4771    _: CreateClusterStatement<Aug>,
4772) -> Result<StatementDesc, PlanError> {
4773    Ok(StatementDesc::new(None))
4774}
4775
4776// WARNING:
4777// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4778// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4779// that option stays the same. If you were to give a default value here, then not giving that option
4780// to ALTER CLUSTER would always reset the value of that option to the default.
4781generate_extracted_config!(
4782    ClusterOption,
4783    (AvailabilityZones, Vec<String>),
4784    (Disk, bool),
4785    (IntrospectionDebugging, bool),
4786    (IntrospectionInterval, OptionalDuration),
4787    (Managed, bool),
4788    (Replicas, Vec<ReplicaDefinition<Aug>>),
4789    (ReplicationFactor, u32),
4790    (Size, String),
4791    (Schedule, ClusterScheduleOptionValue),
4792    (WorkloadClass, OptionalString)
4793);
4794
4795generate_extracted_config!(
4796    NetworkPolicyOption,
4797    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4798);
4799
4800generate_extracted_config!(
4801    NetworkPolicyRuleOption,
4802    (Direction, String),
4803    (Action, String),
4804    (Address, String)
4805);
4806
4807generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4808
4809generate_extracted_config!(
4810    ClusterAlterUntilReadyOption,
4811    (Timeout, Duration),
4812    (OnTimeout, String)
4813);
4814
4815generate_extracted_config!(
4816    ClusterFeature,
4817    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4818    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4819    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4820    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4821    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4822    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4823    (
4824        EnableProjectionPushdownAfterRelationCse,
4825        Option<bool>,
4826        Default(None)
4827    )
4828);
4829
4830/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4831///
4832/// The reverse of [`unplan_create_cluster`].
4833pub fn plan_create_cluster(
4834    scx: &StatementContext,
4835    stmt: CreateClusterStatement<Aug>,
4836) -> Result<Plan, PlanError> {
4837    let plan = plan_create_cluster_inner(scx, stmt)?;
4838
4839    // Roundtrip through unplan and make sure that we end up with the same plan.
4840    if let CreateClusterVariant::Managed(_) = &plan.variant {
4841        let stmt = unplan_create_cluster(scx, plan.clone())
4842            .map_err(|e| PlanError::Replan(e.to_string()))?;
4843        let create_sql = stmt.to_ast_string_stable();
4844        let stmt = parse::parse(&create_sql)
4845            .map_err(|e| PlanError::Replan(e.to_string()))?
4846            .into_element()
4847            .ast;
4848        let (stmt, _resolved_ids) =
4849            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4850        let stmt = match stmt {
4851            Statement::CreateCluster(stmt) => stmt,
4852            stmt => {
4853                return Err(PlanError::Replan(format!(
4854                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4855                )));
4856            }
4857        };
4858        let replan =
4859            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4860        if plan != replan {
4861            return Err(PlanError::Replan(format!(
4862                "replan does not match: plan={plan:?}, replan={replan:?}"
4863            )));
4864        }
4865    }
4866
4867    Ok(Plan::CreateCluster(plan))
4868}
4869
4870pub fn plan_create_cluster_inner(
4871    scx: &StatementContext,
4872    CreateClusterStatement {
4873        name,
4874        options,
4875        features,
4876    }: CreateClusterStatement<Aug>,
4877) -> Result<CreateClusterPlan, PlanError> {
4878    let ClusterOptionExtracted {
4879        availability_zones,
4880        introspection_debugging,
4881        introspection_interval,
4882        managed,
4883        replicas,
4884        replication_factor,
4885        seen: _,
4886        size,
4887        disk,
4888        schedule,
4889        workload_class,
4890    }: ClusterOptionExtracted = options.try_into()?;
4891
4892    let managed = managed.unwrap_or_else(|| replicas.is_none());
4893
4894    if !scx.catalog.active_role_id().is_system() {
4895        if !features.is_empty() {
4896            sql_bail!("FEATURES not supported for non-system users");
4897        }
4898        if workload_class.is_some() {
4899            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4900        }
4901    }
4902
4903    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4904    let workload_class = workload_class.and_then(|v| v.0);
4905
4906    if managed {
4907        if replicas.is_some() {
4908            sql_bail!("REPLICAS not supported for managed clusters");
4909        }
4910        let Some(size) = size else {
4911            sql_bail!("SIZE must be specified for managed clusters");
4912        };
4913
4914        if disk.is_some() {
4915            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4916            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4917            // we'll be able to remove the `DISK` option entirely.
4918            if scx.catalog.is_cluster_size_cc(&size) {
4919                sql_bail!(
4920                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4921                );
4922            }
4923
4924            scx.catalog
4925                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4926        }
4927
4928        let compute = plan_compute_replica_config(
4929            introspection_interval,
4930            introspection_debugging.unwrap_or(false),
4931        )?;
4932
4933        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4934            replication_factor.unwrap_or_else(|| {
4935                scx.catalog
4936                    .system_vars()
4937                    .default_cluster_replication_factor()
4938            })
4939        } else {
4940            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4941            if replication_factor.is_some() {
4942                sql_bail!(
4943                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4944                );
4945            }
4946            // If we have a non-trivial schedule, then let's not have any replicas initially,
4947            // to avoid quickly going back and forth if the schedule doesn't want a replica
4948            // initially.
4949            0
4950        };
4951        let availability_zones = availability_zones.unwrap_or_default();
4952
4953        if !availability_zones.is_empty() {
4954            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4955        }
4956
4957        // Plan OptimizerFeatureOverrides.
4958        let ClusterFeatureExtracted {
4959            reoptimize_imported_views,
4960            enable_eager_delta_joins,
4961            enable_new_outer_join_lowering,
4962            enable_variadic_left_join_lowering,
4963            enable_letrec_fixpoint_analysis,
4964            enable_join_prioritize_arranged,
4965            enable_projection_pushdown_after_relation_cse,
4966            seen: _,
4967        } = ClusterFeatureExtracted::try_from(features)?;
4968        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4969            reoptimize_imported_views,
4970            enable_eager_delta_joins,
4971            enable_new_outer_join_lowering,
4972            enable_variadic_left_join_lowering,
4973            enable_letrec_fixpoint_analysis,
4974            enable_join_prioritize_arranged,
4975            enable_projection_pushdown_after_relation_cse,
4976            ..Default::default()
4977        };
4978
4979        let schedule = plan_cluster_schedule(schedule)?;
4980
4981        Ok(CreateClusterPlan {
4982            name: normalize::ident(name),
4983            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4984                replication_factor,
4985                size,
4986                availability_zones,
4987                compute,
4988                optimizer_feature_overrides,
4989                schedule,
4990            }),
4991            workload_class,
4992        })
4993    } else {
4994        let Some(replica_defs) = replicas else {
4995            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4996        };
4997        if availability_zones.is_some() {
4998            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4999        }
5000        if replication_factor.is_some() {
5001            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
5002        }
5003        if introspection_debugging.is_some() {
5004            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
5005        }
5006        if introspection_interval.is_some() {
5007            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
5008        }
5009        if size.is_some() {
5010            sql_bail!("SIZE not supported for unmanaged clusters");
5011        }
5012        if disk.is_some() {
5013            sql_bail!("DISK not supported for unmanaged clusters");
5014        }
5015        if !features.is_empty() {
5016            sql_bail!("FEATURES not supported for unmanaged clusters");
5017        }
5018        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
5019            sql_bail!(
5020                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
5021            );
5022        }
5023
5024        let mut replicas = vec![];
5025        for ReplicaDefinition { name, options } in replica_defs {
5026            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
5027        }
5028
5029        Ok(CreateClusterPlan {
5030            name: normalize::ident(name),
5031            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
5032            workload_class,
5033        })
5034    }
5035}
5036
5037/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
5038///
5039/// The reverse of [`plan_create_cluster`].
5040pub fn unplan_create_cluster(
5041    scx: &StatementContext,
5042    CreateClusterPlan {
5043        name,
5044        variant,
5045        workload_class,
5046    }: CreateClusterPlan,
5047) -> Result<CreateClusterStatement<Aug>, PlanError> {
5048    match variant {
5049        CreateClusterVariant::Managed(CreateClusterManagedPlan {
5050            replication_factor,
5051            size,
5052            availability_zones,
5053            compute,
5054            optimizer_feature_overrides,
5055            schedule,
5056        }) => {
5057            let schedule = unplan_cluster_schedule(schedule);
5058            let OptimizerFeatureOverrides {
5059                enable_guard_subquery_tablefunc: _,
5060                enable_consolidate_after_union_negate: _,
5061                enable_reduce_mfp_fusion: _,
5062                enable_cardinality_estimates: _,
5063                persist_fast_path_limit: _,
5064                reoptimize_imported_views,
5065                enable_eager_delta_joins,
5066                enable_new_outer_join_lowering,
5067                enable_variadic_left_join_lowering,
5068                enable_letrec_fixpoint_analysis,
5069                enable_join_prioritize_arranged,
5070                enable_projection_pushdown_after_relation_cse,
5071                enable_less_reduce_in_eqprop: _,
5072                enable_dequadratic_eqprop_map: _,
5073                enable_eq_classes_withholding_errors: _,
5074                enable_fast_path_plan_insights: _,
5075                enable_cast_elimination: _,
5076            } = optimizer_feature_overrides;
5077            // The ones from above that don't occur below are not wired up to cluster features.
5078            let features_extracted = ClusterFeatureExtracted {
5079                // Seen is ignored when unplanning.
5080                seen: Default::default(),
5081                reoptimize_imported_views,
5082                enable_eager_delta_joins,
5083                enable_new_outer_join_lowering,
5084                enable_variadic_left_join_lowering,
5085                enable_letrec_fixpoint_analysis,
5086                enable_join_prioritize_arranged,
5087                enable_projection_pushdown_after_relation_cse,
5088            };
5089            let features = features_extracted.into_values(scx.catalog);
5090            let availability_zones = if availability_zones.is_empty() {
5091                None
5092            } else {
5093                Some(availability_zones)
5094            };
5095            let (introspection_interval, introspection_debugging) =
5096                unplan_compute_replica_config(compute);
5097            // Replication factor cannot be explicitly specified with a refresh schedule, it's
5098            // always 1 or less.
5099            let replication_factor = match &schedule {
5100                ClusterScheduleOptionValue::Manual => Some(replication_factor),
5101                ClusterScheduleOptionValue::Refresh { .. } => {
5102                    assert!(
5103                        replication_factor <= 1,
5104                        "replication factor, {replication_factor:?}, must be <= 1"
5105                    );
5106                    None
5107                }
5108            };
5109            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5110            let options_extracted = ClusterOptionExtracted {
5111                // Seen is ignored when unplanning.
5112                seen: Default::default(),
5113                availability_zones,
5114                disk: None,
5115                introspection_debugging: Some(introspection_debugging),
5116                introspection_interval,
5117                managed: Some(true),
5118                replicas: None,
5119                replication_factor,
5120                size: Some(size),
5121                schedule: Some(schedule),
5122                workload_class,
5123            };
5124            let options = options_extracted.into_values(scx.catalog);
5125            let name = Ident::new_unchecked(name);
5126            Ok(CreateClusterStatement {
5127                name,
5128                options,
5129                features,
5130            })
5131        }
5132        CreateClusterVariant::Unmanaged(_) => {
5133            bail_unsupported!("SHOW CREATE for unmanaged clusters")
5134        }
5135    }
5136}
5137
5138generate_extracted_config!(
5139    ReplicaOption,
5140    (AvailabilityZone, String),
5141    (BilledAs, String),
5142    (ComputeAddresses, Vec<String>),
5143    (ComputectlAddresses, Vec<String>),
5144    (Disk, bool),
5145    (Internal, bool, Default(false)),
5146    (IntrospectionDebugging, bool, Default(false)),
5147    (IntrospectionInterval, OptionalDuration),
5148    (Size, String),
5149    (StorageAddresses, Vec<String>),
5150    (StoragectlAddresses, Vec<String>),
5151    (Workers, u16)
5152);
5153
5154fn plan_replica_config(
5155    scx: &StatementContext,
5156    options: Vec<ReplicaOption<Aug>>,
5157) -> Result<ReplicaConfig, PlanError> {
5158    let ReplicaOptionExtracted {
5159        availability_zone,
5160        billed_as,
5161        computectl_addresses,
5162        disk,
5163        internal,
5164        introspection_debugging,
5165        introspection_interval,
5166        size,
5167        storagectl_addresses,
5168        ..
5169    }: ReplicaOptionExtracted = options.try_into()?;
5170
5171    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5172
5173    match (
5174        size,
5175        availability_zone,
5176        billed_as,
5177        storagectl_addresses,
5178        computectl_addresses,
5179    ) {
5180        // Common cases we expect end users to hit.
5181        (None, _, None, None, None) => {
5182            // We don't mention the unmanaged options in the error message
5183            // because they are only available in unsafe mode.
5184            sql_bail!("SIZE option must be specified");
5185        }
5186        (Some(size), availability_zone, billed_as, None, None) => {
5187            if disk.is_some() {
5188                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5189                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5190                // we'll be able to remove the `DISK` option entirely.
5191                if scx.catalog.is_cluster_size_cc(&size) {
5192                    sql_bail!(
5193                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5194                    );
5195                }
5196
5197                scx.catalog
5198                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5199            }
5200
5201            Ok(ReplicaConfig::Orchestrated {
5202                size,
5203                availability_zone,
5204                compute,
5205                billed_as,
5206                internal,
5207            })
5208        }
5209
5210        (None, None, None, storagectl_addresses, computectl_addresses) => {
5211            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5212
5213            // When manually testing Materialize in unsafe mode, it's easy to
5214            // accidentally omit one of these options, so we try to produce
5215            // helpful error messages.
5216            let Some(storagectl_addrs) = storagectl_addresses else {
5217                sql_bail!("missing STORAGECTL ADDRESSES option");
5218            };
5219            let Some(computectl_addrs) = computectl_addresses else {
5220                sql_bail!("missing COMPUTECTL ADDRESSES option");
5221            };
5222
5223            if storagectl_addrs.len() != computectl_addrs.len() {
5224                sql_bail!(
5225                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5226                );
5227            }
5228
5229            if disk.is_some() {
5230                sql_bail!("DISK can't be specified for unorchestrated clusters");
5231            }
5232
5233            Ok(ReplicaConfig::Unorchestrated {
5234                storagectl_addrs,
5235                computectl_addrs,
5236                compute,
5237            })
5238        }
5239        _ => {
5240            // We don't bother trying to produce a more helpful error message
5241            // here because no user is likely to hit this path.
5242            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5243        }
5244    }
5245}
5246
5247/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5248///
5249/// The reverse of [`unplan_compute_replica_config`].
5250fn plan_compute_replica_config(
5251    introspection_interval: Option<OptionalDuration>,
5252    introspection_debugging: bool,
5253) -> Result<ComputeReplicaConfig, PlanError> {
5254    let introspection_interval = introspection_interval
5255        .map(|OptionalDuration(i)| i)
5256        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5257    let introspection = match introspection_interval {
5258        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5259            interval,
5260            debugging: introspection_debugging,
5261        }),
5262        None if introspection_debugging => {
5263            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5264        }
5265        None => None,
5266    };
5267    let compute = ComputeReplicaConfig { introspection };
5268    Ok(compute)
5269}
5270
5271/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5272///
5273/// The reverse of [`plan_compute_replica_config`].
5274fn unplan_compute_replica_config(
5275    compute_replica_config: ComputeReplicaConfig,
5276) -> (Option<OptionalDuration>, bool) {
5277    match compute_replica_config.introspection {
5278        Some(ComputeReplicaIntrospectionConfig {
5279            debugging,
5280            interval,
5281        }) => (Some(OptionalDuration(Some(interval))), debugging),
5282        None => (Some(OptionalDuration(None)), false),
5283    }
5284}
5285
5286/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5287///
5288/// The reverse of [`unplan_cluster_schedule`].
5289fn plan_cluster_schedule(
5290    schedule: ClusterScheduleOptionValue,
5291) -> Result<ClusterSchedule, PlanError> {
5292    Ok(match schedule {
5293        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5294        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5295        ClusterScheduleOptionValue::Refresh {
5296            hydration_time_estimate: None,
5297        } => ClusterSchedule::Refresh {
5298            hydration_time_estimate: Duration::from_millis(0),
5299        },
5300        // Otherwise we convert the `IntervalValue` to a `Duration`.
5301        ClusterScheduleOptionValue::Refresh {
5302            hydration_time_estimate: Some(interval_value),
5303        } => {
5304            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5305            if interval.as_microseconds() < 0 {
5306                sql_bail!(
5307                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5308                    interval
5309                );
5310            }
5311            if interval.months != 0 {
5312                // This limitation is because we want this interval to be cleanly convertable
5313                // to a unix epoch timestamp difference. When the interval involves months, then
5314                // this is not true anymore, because months have variable lengths.
5315                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5316            }
5317            let duration = interval.duration()?;
5318            if u64::try_from(duration.as_millis()).is_err()
5319                || Interval::from_duration(&duration).is_err()
5320            {
5321                sql_bail!("HYDRATION TIME ESTIMATE too large");
5322            }
5323            ClusterSchedule::Refresh {
5324                hydration_time_estimate: duration,
5325            }
5326        }
5327    })
5328}
5329
5330/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5331///
5332/// The reverse of [`plan_cluster_schedule`].
5333fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5334    match schedule {
5335        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5336        ClusterSchedule::Refresh {
5337            hydration_time_estimate,
5338        } => {
5339            let interval = Interval::from_duration(&hydration_time_estimate)
5340                .expect("planning ensured that this is convertible back to Interval");
5341            let interval_value = literal::unplan_interval(&interval);
5342            ClusterScheduleOptionValue::Refresh {
5343                hydration_time_estimate: Some(interval_value),
5344            }
5345        }
5346    }
5347}
5348
5349pub fn describe_create_cluster_replica(
5350    _: &StatementContext,
5351    _: CreateClusterReplicaStatement<Aug>,
5352) -> Result<StatementDesc, PlanError> {
5353    Ok(StatementDesc::new(None))
5354}
5355
5356pub fn plan_create_cluster_replica(
5357    scx: &StatementContext,
5358    CreateClusterReplicaStatement {
5359        definition: ReplicaDefinition { name, options },
5360        of_cluster,
5361    }: CreateClusterReplicaStatement<Aug>,
5362) -> Result<Plan, PlanError> {
5363    let cluster = scx
5364        .catalog
5365        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5366    let current_replica_count = cluster.replica_ids().iter().count();
5367    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5368        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5369        return Err(PlanError::CreateReplicaFailStorageObjects {
5370            current_replica_count,
5371            internal_replica_count,
5372            hypothetical_replica_count: current_replica_count + 1,
5373        });
5374    }
5375
5376    let config = plan_replica_config(scx, options)?;
5377
5378    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5379        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5380            return Err(PlanError::MangedReplicaName(name.into_string()));
5381        }
5382    } else {
5383        ensure_cluster_is_not_managed(scx, cluster.id())?;
5384    }
5385
5386    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5387        name: normalize::ident(name),
5388        cluster_id: cluster.id(),
5389        config,
5390    }))
5391}
5392
5393pub fn describe_create_secret(
5394    _: &StatementContext,
5395    _: CreateSecretStatement<Aug>,
5396) -> Result<StatementDesc, PlanError> {
5397    Ok(StatementDesc::new(None))
5398}
5399
5400pub fn plan_create_secret(
5401    scx: &StatementContext,
5402    stmt: CreateSecretStatement<Aug>,
5403) -> Result<Plan, PlanError> {
5404    let CreateSecretStatement {
5405        name,
5406        if_not_exists,
5407        value,
5408    } = &stmt;
5409
5410    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5411    let mut create_sql_statement = stmt.clone();
5412    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5413    let create_sql =
5414        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5415    let secret_as = query::plan_secret_as(scx, value.clone())?;
5416
5417    let secret = Secret {
5418        create_sql,
5419        secret_as,
5420    };
5421
5422    Ok(Plan::CreateSecret(CreateSecretPlan {
5423        name,
5424        secret,
5425        if_not_exists: *if_not_exists,
5426    }))
5427}
5428
5429pub fn describe_create_connection(
5430    _: &StatementContext,
5431    _: CreateConnectionStatement<Aug>,
5432) -> Result<StatementDesc, PlanError> {
5433    Ok(StatementDesc::new(None))
5434}
5435
5436generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5437
5438pub fn plan_create_connection(
5439    scx: &StatementContext,
5440    mut stmt: CreateConnectionStatement<Aug>,
5441) -> Result<Plan, PlanError> {
5442    let CreateConnectionStatement {
5443        name,
5444        connection_type,
5445        values,
5446        if_not_exists,
5447        with_options,
5448    } = stmt.clone();
5449    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5450    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5451    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5452
5453    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5454    if options.validate.is_some() {
5455        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5456    }
5457    let validate = match options.validate {
5458        Some(val) => val,
5459        None => {
5460            scx.catalog
5461                .system_vars()
5462                .enable_default_connection_validation()
5463                && details.to_connection().validate_by_default()
5464        }
5465    };
5466
5467    // Check for an object in the catalog with this same name
5468    let full_name = scx.catalog.resolve_full_name(&name);
5469    let partial_name = PartialItemName::from(full_name.clone());
5470    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5471        return Err(PlanError::ItemAlreadyExists {
5472            name: full_name.to_string(),
5473            item_type: item.item_type(),
5474        });
5475    }
5476
5477    // For SSH connections, overwrite the public key options based on the
5478    // connection details, in case we generated new keys during planning.
5479    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5480        stmt.values.retain(|v| {
5481            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5482        });
5483        stmt.values.push(ConnectionOption {
5484            name: ConnectionOptionName::PublicKey1,
5485            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5486        });
5487        stmt.values.push(ConnectionOption {
5488            name: ConnectionOptionName::PublicKey2,
5489            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5490        });
5491    }
5492    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5493
5494    let plan = CreateConnectionPlan {
5495        name,
5496        if_not_exists,
5497        connection: crate::plan::Connection {
5498            create_sql,
5499            details,
5500        },
5501        validate,
5502    };
5503    Ok(Plan::CreateConnection(plan))
5504}
5505
5506fn plan_drop_database(
5507    scx: &StatementContext,
5508    if_exists: bool,
5509    name: &UnresolvedDatabaseName,
5510    cascade: bool,
5511) -> Result<Option<DatabaseId>, PlanError> {
5512    Ok(match resolve_database(scx, name, if_exists)? {
5513        Some(database) => {
5514            if !cascade && database.has_schemas() {
5515                sql_bail!(
5516                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5517                    name,
5518                );
5519            }
5520            Some(database.id())
5521        }
5522        None => None,
5523    })
5524}
5525
5526pub fn describe_drop_objects(
5527    _: &StatementContext,
5528    _: DropObjectsStatement,
5529) -> Result<StatementDesc, PlanError> {
5530    Ok(StatementDesc::new(None))
5531}
5532
5533pub fn plan_drop_objects(
5534    scx: &mut StatementContext,
5535    DropObjectsStatement {
5536        object_type,
5537        if_exists,
5538        names,
5539        cascade,
5540    }: DropObjectsStatement,
5541) -> Result<Plan, PlanError> {
5542    assert_ne!(
5543        object_type,
5544        mz_sql_parser::ast::ObjectType::Func,
5545        "rejected in parser"
5546    );
5547    let object_type = object_type.into();
5548
5549    let mut referenced_ids = Vec::new();
5550    for name in names {
5551        let id = match &name {
5552            UnresolvedObjectName::Cluster(name) => {
5553                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5554            }
5555            UnresolvedObjectName::ClusterReplica(name) => {
5556                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5557            }
5558            UnresolvedObjectName::Database(name) => {
5559                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5560            }
5561            UnresolvedObjectName::Schema(name) => {
5562                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5563            }
5564            UnresolvedObjectName::Role(name) => {
5565                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5566            }
5567            UnresolvedObjectName::Item(name) => {
5568                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5569                    .map(ObjectId::Item)
5570            }
5571            UnresolvedObjectName::NetworkPolicy(name) => {
5572                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5573            }
5574        };
5575        match id {
5576            Some(id) => referenced_ids.push(id),
5577            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5578                name: name.to_ast_string_simple(),
5579                object_type,
5580            }),
5581        }
5582    }
5583    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5584
5585    Ok(Plan::DropObjects(DropObjectsPlan {
5586        referenced_ids,
5587        drop_ids,
5588        object_type,
5589    }))
5590}
5591
5592fn plan_drop_schema(
5593    scx: &StatementContext,
5594    if_exists: bool,
5595    name: &UnresolvedSchemaName,
5596    cascade: bool,
5597) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5598    // Special case for mz_temp: with lazy temporary schema creation, the temp
5599    // schema may not exist yet, but we still need to return the correct error.
5600    // Check the schema name directly against MZ_TEMP_SCHEMA.
5601    let normalized = normalize::unresolved_schema_name(name.clone())?;
5602    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5603        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5604    }
5605
5606    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5607        Some((database_spec, schema_spec)) => {
5608            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5609                sql_bail!(
5610                    "cannot drop schema {name} because it is required by the database system",
5611                );
5612            }
5613            if let SchemaSpecifier::Temporary = schema_spec {
5614                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5615            }
5616            let schema = scx.get_schema(&database_spec, &schema_spec);
5617            if !cascade && schema.has_items() {
5618                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5619                sql_bail!(
5620                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5621                    full_schema_name
5622                );
5623            }
5624            Some((database_spec, schema_spec))
5625        }
5626        None => None,
5627    })
5628}
5629
5630fn plan_drop_role(
5631    scx: &StatementContext,
5632    if_exists: bool,
5633    name: &Ident,
5634) -> Result<Option<RoleId>, PlanError> {
5635    match scx.catalog.resolve_role(name.as_str()) {
5636        Ok(role) => {
5637            let id = role.id();
5638            if &id == scx.catalog.active_role_id() {
5639                sql_bail!("current role cannot be dropped");
5640            }
5641            for role in scx.catalog.get_roles() {
5642                for (member_id, grantor_id) in role.membership() {
5643                    if &id == grantor_id {
5644                        let member_role = scx.catalog.get_role(member_id);
5645                        sql_bail!(
5646                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5647                            name.as_str(),
5648                            role.name(),
5649                            member_role.name()
5650                        );
5651                    }
5652                }
5653            }
5654            Ok(Some(role.id()))
5655        }
5656        Err(_) if if_exists => Ok(None),
5657        Err(e) => Err(e.into()),
5658    }
5659}
5660
5661fn plan_drop_cluster(
5662    scx: &StatementContext,
5663    if_exists: bool,
5664    name: &Ident,
5665    cascade: bool,
5666) -> Result<Option<ClusterId>, PlanError> {
5667    Ok(match resolve_cluster(scx, name, if_exists)? {
5668        Some(cluster) => {
5669            if !cascade && !cluster.bound_objects().is_empty() {
5670                return Err(PlanError::DependentObjectsStillExist {
5671                    object_type: "cluster".to_string(),
5672                    object_name: cluster.name().to_string(),
5673                    dependents: Vec::new(),
5674                });
5675            }
5676            Some(cluster.id())
5677        }
5678        None => None,
5679    })
5680}
5681
5682fn plan_drop_network_policy(
5683    scx: &StatementContext,
5684    if_exists: bool,
5685    name: &Ident,
5686) -> Result<Option<NetworkPolicyId>, PlanError> {
5687    match scx.catalog.resolve_network_policy(name.as_str()) {
5688        Ok(policy) => {
5689            // TODO(network_policy): When we support role based network policies, check if any role
5690            // currently has the specified policy set.
5691            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5692                Err(PlanError::NetworkPolicyInUse)
5693            } else {
5694                Ok(Some(policy.id()))
5695            }
5696        }
5697        Err(_) if if_exists => Ok(None),
5698        Err(e) => Err(e.into()),
5699    }
5700}
5701
5702/// Returns `true` if the cluster has any object that requires a single replica.
5703/// Returns `false` if the cluster has no objects.
5704fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5705    // If this feature is enabled then all objects support multiple-replicas
5706    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5707        false
5708    } else {
5709        // Othewise we check for the existence of sources or sinks
5710        cluster.bound_objects().iter().any(|id| {
5711            let item = scx.catalog.get_item(id);
5712            matches!(
5713                item.item_type(),
5714                CatalogItemType::Sink | CatalogItemType::Source
5715            )
5716        })
5717    }
5718}
5719
5720fn plan_drop_cluster_replica(
5721    scx: &StatementContext,
5722    if_exists: bool,
5723    name: &QualifiedReplica,
5724) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5725    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5726    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5727}
5728
5729/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5730fn plan_drop_item(
5731    scx: &StatementContext,
5732    object_type: ObjectType,
5733    if_exists: bool,
5734    name: UnresolvedItemName,
5735    cascade: bool,
5736) -> Result<Option<CatalogItemId>, PlanError> {
5737    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5738        Ok(r) => r,
5739        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5740        Err(PlanError::MismatchedObjectType {
5741            name,
5742            is_type: ObjectType::MaterializedView,
5743            expected_type: ObjectType::View,
5744        }) => {
5745            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5746        }
5747        e => e?,
5748    };
5749
5750    Ok(match resolved {
5751        Some(catalog_item) => {
5752            if catalog_item.id().is_system() {
5753                sql_bail!(
5754                    "cannot drop {} {} because it is required by the database system",
5755                    catalog_item.item_type(),
5756                    scx.catalog.minimal_qualification(catalog_item.name()),
5757                );
5758            }
5759
5760            if !cascade {
5761                for id in catalog_item.used_by() {
5762                    let dep = scx.catalog.get_item(id);
5763                    if dependency_prevents_drop(object_type, dep) {
5764                        return Err(PlanError::DependentObjectsStillExist {
5765                            object_type: catalog_item.item_type().to_string(),
5766                            object_name: scx
5767                                .catalog
5768                                .minimal_qualification(catalog_item.name())
5769                                .to_string(),
5770                            dependents: vec![(
5771                                dep.item_type().to_string(),
5772                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5773                            )],
5774                        });
5775                    }
5776                }
5777                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5778                //  relies on entry. Unfortunately, we don't have that information readily available.
5779            }
5780            Some(catalog_item.id())
5781        }
5782        None => None,
5783    })
5784}
5785
5786/// Does the dependency `dep` prevent a drop of a non-cascade query?
5787fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5788    match object_type {
5789        ObjectType::Type => true,
5790        _ => match dep.item_type() {
5791            CatalogItemType::Func
5792            | CatalogItemType::Table
5793            | CatalogItemType::Source
5794            | CatalogItemType::View
5795            | CatalogItemType::MaterializedView
5796            | CatalogItemType::Sink
5797            | CatalogItemType::Type
5798            | CatalogItemType::Secret
5799            | CatalogItemType::Connection
5800            | CatalogItemType::ContinualTask => true,
5801            CatalogItemType::Index => false,
5802        },
5803    }
5804}
5805
5806pub fn describe_alter_index_options(
5807    _: &StatementContext,
5808    _: AlterIndexStatement<Aug>,
5809) -> Result<StatementDesc, PlanError> {
5810    Ok(StatementDesc::new(None))
5811}
5812
5813pub fn describe_drop_owned(
5814    _: &StatementContext,
5815    _: DropOwnedStatement<Aug>,
5816) -> Result<StatementDesc, PlanError> {
5817    Ok(StatementDesc::new(None))
5818}
5819
5820pub fn plan_drop_owned(
5821    scx: &StatementContext,
5822    drop: DropOwnedStatement<Aug>,
5823) -> Result<Plan, PlanError> {
5824    let cascade = drop.cascade();
5825    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5826    let mut drop_ids = Vec::new();
5827    let mut privilege_revokes = Vec::new();
5828    let mut default_privilege_revokes = Vec::new();
5829
5830    fn update_privilege_revokes(
5831        object_id: SystemObjectId,
5832        privileges: &PrivilegeMap,
5833        role_ids: &BTreeSet<RoleId>,
5834        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5835    ) {
5836        privilege_revokes.extend(iter::zip(
5837            iter::repeat(object_id),
5838            privileges
5839                .all_values()
5840                .filter(|privilege| role_ids.contains(&privilege.grantee))
5841                .cloned(),
5842        ));
5843    }
5844
5845    // Replicas
5846    for replica in scx.catalog.get_cluster_replicas() {
5847        if role_ids.contains(&replica.owner_id()) {
5848            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5849        }
5850    }
5851
5852    // Clusters
5853    for cluster in scx.catalog.get_clusters() {
5854        if role_ids.contains(&cluster.owner_id()) {
5855            // Note: CASCADE is not required for replicas.
5856            if !cascade {
5857                let non_owned_bound_objects: Vec<_> = cluster
5858                    .bound_objects()
5859                    .into_iter()
5860                    .map(|item_id| scx.catalog.get_item(item_id))
5861                    .filter(|item| !role_ids.contains(&item.owner_id()))
5862                    .collect();
5863                if !non_owned_bound_objects.is_empty() {
5864                    let names: Vec<_> = non_owned_bound_objects
5865                        .into_iter()
5866                        .map(|item| {
5867                            (
5868                                item.item_type().to_string(),
5869                                scx.catalog.resolve_full_name(item.name()).to_string(),
5870                            )
5871                        })
5872                        .collect();
5873                    return Err(PlanError::DependentObjectsStillExist {
5874                        object_type: "cluster".to_string(),
5875                        object_name: cluster.name().to_string(),
5876                        dependents: names,
5877                    });
5878                }
5879            }
5880            drop_ids.push(cluster.id().into());
5881        }
5882        update_privilege_revokes(
5883            SystemObjectId::Object(cluster.id().into()),
5884            cluster.privileges(),
5885            &role_ids,
5886            &mut privilege_revokes,
5887        );
5888    }
5889
5890    // Items
5891    for item in scx.catalog.get_items() {
5892        if role_ids.contains(&item.owner_id()) {
5893            if !cascade {
5894                // Checks if any items still depend on this one, returning an error if so.
5895                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5896                    let non_owned_dependencies: Vec<_> = used_by
5897                        .into_iter()
5898                        .map(|item_id| scx.catalog.get_item(item_id))
5899                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5900                        .filter(|item| !role_ids.contains(&item.owner_id()))
5901                        .collect();
5902                    if !non_owned_dependencies.is_empty() {
5903                        let names: Vec<_> = non_owned_dependencies
5904                            .into_iter()
5905                            .map(|item| {
5906                                let item_typ = item.item_type().to_string();
5907                                let item_name =
5908                                    scx.catalog.resolve_full_name(item.name()).to_string();
5909                                (item_typ, item_name)
5910                            })
5911                            .collect();
5912                        Err(PlanError::DependentObjectsStillExist {
5913                            object_type: item.item_type().to_string(),
5914                            object_name: scx
5915                                .catalog
5916                                .resolve_full_name(item.name())
5917                                .to_string()
5918                                .to_string(),
5919                            dependents: names,
5920                        })
5921                    } else {
5922                        Ok(())
5923                    }
5924                };
5925
5926                // When this item gets dropped it will also drop its progress source, so we need to
5927                // check the users of those.
5928                if let Some(id) = item.progress_id() {
5929                    let progress_item = scx.catalog.get_item(&id);
5930                    check_if_dependents_exist(progress_item.used_by())?;
5931                }
5932                check_if_dependents_exist(item.used_by())?;
5933            }
5934            drop_ids.push(item.id().into());
5935        }
5936        update_privilege_revokes(
5937            SystemObjectId::Object(item.id().into()),
5938            item.privileges(),
5939            &role_ids,
5940            &mut privilege_revokes,
5941        );
5942    }
5943
5944    // Schemas
5945    for schema in scx.catalog.get_schemas() {
5946        if !schema.id().is_temporary() {
5947            if role_ids.contains(&schema.owner_id()) {
5948                if !cascade {
5949                    let non_owned_dependencies: Vec<_> = schema
5950                        .item_ids()
5951                        .map(|item_id| scx.catalog.get_item(&item_id))
5952                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5953                        .filter(|item| !role_ids.contains(&item.owner_id()))
5954                        .collect();
5955                    if !non_owned_dependencies.is_empty() {
5956                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5957                        sql_bail!(
5958                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5959                            full_schema_name.to_string().quoted()
5960                        );
5961                    }
5962                }
5963                drop_ids.push((*schema.database(), *schema.id()).into())
5964            }
5965            update_privilege_revokes(
5966                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5967                schema.privileges(),
5968                &role_ids,
5969                &mut privilege_revokes,
5970            );
5971        }
5972    }
5973
5974    // Databases
5975    for database in scx.catalog.get_databases() {
5976        if role_ids.contains(&database.owner_id()) {
5977            if !cascade {
5978                let non_owned_schemas: Vec<_> = database
5979                    .schemas()
5980                    .into_iter()
5981                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5982                    .collect();
5983                if !non_owned_schemas.is_empty() {
5984                    sql_bail!(
5985                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5986                        database.name().quoted(),
5987                    );
5988                }
5989            }
5990            drop_ids.push(database.id().into());
5991        }
5992        update_privilege_revokes(
5993            SystemObjectId::Object(database.id().into()),
5994            database.privileges(),
5995            &role_ids,
5996            &mut privilege_revokes,
5997        );
5998    }
5999
6000    // Network policies
6001    for network_policy in scx.catalog.get_network_policies() {
6002        if role_ids.contains(&network_policy.owner_id()) {
6003            drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
6004        }
6005        update_privilege_revokes(
6006            SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
6007            network_policy.privileges(),
6008            &role_ids,
6009            &mut privilege_revokes,
6010        );
6011    }
6012
6013    // System
6014    update_privilege_revokes(
6015        SystemObjectId::System,
6016        scx.catalog.get_system_privileges(),
6017        &role_ids,
6018        &mut privilege_revokes,
6019    );
6020
6021    for (default_privilege_object, default_privilege_acl_items) in
6022        scx.catalog.get_default_privileges()
6023    {
6024        for default_privilege_acl_item in default_privilege_acl_items {
6025            if role_ids.contains(&default_privilege_object.role_id)
6026                || role_ids.contains(&default_privilege_acl_item.grantee)
6027            {
6028                default_privilege_revokes.push((
6029                    default_privilege_object.clone(),
6030                    default_privilege_acl_item.clone(),
6031                ));
6032            }
6033        }
6034    }
6035
6036    let drop_ids = scx.catalog.object_dependents(&drop_ids);
6037
6038    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
6039    if !system_ids.is_empty() {
6040        let mut owners = system_ids
6041            .into_iter()
6042            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
6043            .collect::<BTreeSet<_>>()
6044            .into_iter()
6045            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
6046        sql_bail!(
6047            "cannot drop objects owned by role {} because they are required by the database system",
6048            owners.join(", "),
6049        );
6050    }
6051
6052    Ok(Plan::DropOwned(DropOwnedPlan {
6053        role_ids: role_ids.into_iter().collect(),
6054        drop_ids,
6055        privilege_revokes,
6056        default_privilege_revokes,
6057    }))
6058}
6059
6060fn plan_retain_history_option(
6061    scx: &StatementContext,
6062    retain_history: Option<OptionalDuration>,
6063) -> Result<Option<CompactionWindow>, PlanError> {
6064    if let Some(OptionalDuration(lcw)) = retain_history {
6065        Ok(Some(plan_retain_history(scx, lcw)?))
6066    } else {
6067        Ok(None)
6068    }
6069}
6070
6071// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
6072// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
6073// already converts the zero duration into `None`. This function must not be called in the `RESET
6074// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
6075// `None`.
6076fn plan_retain_history(
6077    scx: &StatementContext,
6078    lcw: Option<Duration>,
6079) -> Result<CompactionWindow, PlanError> {
6080    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6081    match lcw {
6082        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
6083        // disable compaction), and should never occur here. Furthermore, some things actually do
6084        // break when this is set to real zero:
6085        // https://github.com/MaterializeInc/database-issues/issues/3798.
6086        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6087            option_name: "RETAIN HISTORY".to_string(),
6088            err: Box::new(PlanError::Unstructured(
6089                "internal error: unexpectedly zero".to_string(),
6090            )),
6091        }),
6092        Some(duration) => {
6093            // Error if the duration is low and enable_unlimited_retain_history is not set (which
6094            // should only be possible during testing).
6095            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6096                && scx
6097                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6098                    .is_err()
6099            {
6100                return Err(PlanError::RetainHistoryLow {
6101                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6102                });
6103            }
6104            Ok(duration.try_into()?)
6105        }
6106        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
6107        // to be a bad choice, so prevent it.
6108        None => {
6109            if scx
6110                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6111                .is_err()
6112            {
6113                Err(PlanError::RetainHistoryRequired)
6114            } else {
6115                Ok(CompactionWindow::DisableCompaction)
6116            }
6117        }
6118    }
6119}
6120
6121generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6122
6123fn plan_index_options(
6124    scx: &StatementContext,
6125    with_opts: Vec<IndexOption<Aug>>,
6126) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6127    if !with_opts.is_empty() {
6128        // Index options are not durable.
6129        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6130    }
6131
6132    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6133
6134    let mut out = Vec::with_capacity(1);
6135    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6136        out.push(crate::plan::IndexOption::RetainHistory(cw));
6137    }
6138    Ok(out)
6139}
6140
6141generate_extracted_config!(
6142    TableOption,
6143    (PartitionBy, Vec<Ident>),
6144    (RetainHistory, OptionalDuration),
6145    (RedactedTest, String)
6146);
6147
6148fn plan_table_options(
6149    scx: &StatementContext,
6150    desc: &RelationDesc,
6151    with_opts: Vec<TableOption<Aug>>,
6152) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6153    let TableOptionExtracted {
6154        partition_by,
6155        retain_history,
6156        redacted_test,
6157        ..
6158    }: TableOptionExtracted = with_opts.try_into()?;
6159
6160    if let Some(partition_by) = partition_by {
6161        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6162        check_partition_by(desc, partition_by)?;
6163    }
6164
6165    if redacted_test.is_some() {
6166        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6167    }
6168
6169    let mut out = Vec::with_capacity(1);
6170    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6171        out.push(crate::plan::TableOption::RetainHistory(cw));
6172    }
6173    Ok(out)
6174}
6175
6176pub fn plan_alter_index_options(
6177    scx: &mut StatementContext,
6178    AlterIndexStatement {
6179        index_name,
6180        if_exists,
6181        action,
6182    }: AlterIndexStatement<Aug>,
6183) -> Result<Plan, PlanError> {
6184    let object_type = ObjectType::Index;
6185    match action {
6186        AlterIndexAction::ResetOptions(options) => {
6187            let mut options = options.into_iter();
6188            if let Some(opt) = options.next() {
6189                match opt {
6190                    IndexOptionName::RetainHistory => {
6191                        if options.next().is_some() {
6192                            sql_bail!("RETAIN HISTORY must be only option");
6193                        }
6194                        return alter_retain_history(
6195                            scx,
6196                            object_type,
6197                            if_exists,
6198                            UnresolvedObjectName::Item(index_name),
6199                            None,
6200                        );
6201                    }
6202                }
6203            }
6204            sql_bail!("expected option");
6205        }
6206        AlterIndexAction::SetOptions(options) => {
6207            let mut options = options.into_iter();
6208            if let Some(opt) = options.next() {
6209                match opt.name {
6210                    IndexOptionName::RetainHistory => {
6211                        if options.next().is_some() {
6212                            sql_bail!("RETAIN HISTORY must be only option");
6213                        }
6214                        return alter_retain_history(
6215                            scx,
6216                            object_type,
6217                            if_exists,
6218                            UnresolvedObjectName::Item(index_name),
6219                            opt.value,
6220                        );
6221                    }
6222                }
6223            }
6224            sql_bail!("expected option");
6225        }
6226    }
6227}
6228
6229pub fn describe_alter_cluster_set_options(
6230    _: &StatementContext,
6231    _: AlterClusterStatement<Aug>,
6232) -> Result<StatementDesc, PlanError> {
6233    Ok(StatementDesc::new(None))
6234}
6235
6236pub fn plan_alter_cluster(
6237    scx: &mut StatementContext,
6238    AlterClusterStatement {
6239        name,
6240        action,
6241        if_exists,
6242    }: AlterClusterStatement<Aug>,
6243) -> Result<Plan, PlanError> {
6244    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6245        Some(entry) => entry,
6246        None => {
6247            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6248                name: name.to_ast_string_simple(),
6249                object_type: ObjectType::Cluster,
6250            });
6251
6252            return Ok(Plan::AlterNoop(AlterNoopPlan {
6253                object_type: ObjectType::Cluster,
6254            }));
6255        }
6256    };
6257
6258    let mut options: PlanClusterOption = Default::default();
6259    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6260
6261    match action {
6262        AlterClusterAction::SetOptions {
6263            options: set_options,
6264            with_options,
6265        } => {
6266            let ClusterOptionExtracted {
6267                availability_zones,
6268                introspection_debugging,
6269                introspection_interval,
6270                managed,
6271                replicas: replica_defs,
6272                replication_factor,
6273                seen: _,
6274                size,
6275                disk,
6276                schedule,
6277                workload_class,
6278            }: ClusterOptionExtracted = set_options.try_into()?;
6279
6280            if !scx.catalog.active_role_id().is_system() {
6281                if workload_class.is_some() {
6282                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6283                }
6284            }
6285
6286            match managed.unwrap_or_else(|| cluster.is_managed()) {
6287                true => {
6288                    let alter_strategy_extracted =
6289                        ClusterAlterOptionExtracted::try_from(with_options)?;
6290                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6291
6292                    match alter_strategy {
6293                        AlterClusterPlanStrategy::None => {}
6294                        _ => {
6295                            scx.require_feature_flag(
6296                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6297                            )?;
6298                        }
6299                    }
6300
6301                    if replica_defs.is_some() {
6302                        sql_bail!("REPLICAS not supported for managed clusters");
6303                    }
6304                    if schedule.is_some()
6305                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6306                    {
6307                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6308                    }
6309
6310                    if let Some(replication_factor) = replication_factor {
6311                        if schedule.is_some()
6312                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6313                        {
6314                            sql_bail!(
6315                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6316                            );
6317                        }
6318                        if let Some(current_schedule) = cluster.schedule() {
6319                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6320                                sql_bail!(
6321                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6322                                );
6323                            }
6324                        }
6325
6326                        let internal_replica_count =
6327                            cluster.replicas().iter().filter(|r| r.internal()).count();
6328                        let hypothetical_replica_count =
6329                            internal_replica_count + usize::cast_from(replication_factor);
6330
6331                        // Total number of replicas running is internal replicas
6332                        // + replication factor.
6333                        if contains_single_replica_objects(scx, cluster)
6334                            && hypothetical_replica_count > 1
6335                        {
6336                            return Err(PlanError::CreateReplicaFailStorageObjects {
6337                                current_replica_count: cluster.replica_ids().iter().count(),
6338                                internal_replica_count,
6339                                hypothetical_replica_count,
6340                            });
6341                        }
6342                    } else if alter_strategy.is_some() {
6343                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6344                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6345                        // just fail.
6346                        let internal_replica_count =
6347                            cluster.replicas().iter().filter(|r| r.internal()).count();
6348                        let hypothetical_replica_count = internal_replica_count * 2;
6349                        if contains_single_replica_objects(scx, cluster) {
6350                            return Err(PlanError::CreateReplicaFailStorageObjects {
6351                                current_replica_count: cluster.replica_ids().iter().count(),
6352                                internal_replica_count,
6353                                hypothetical_replica_count,
6354                            });
6355                        }
6356                    }
6357                }
6358                false => {
6359                    if !alter_strategy.is_none() {
6360                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6361                    }
6362                    if availability_zones.is_some() {
6363                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6364                    }
6365                    if replication_factor.is_some() {
6366                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6367                    }
6368                    if introspection_debugging.is_some() {
6369                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6370                    }
6371                    if introspection_interval.is_some() {
6372                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6373                    }
6374                    if size.is_some() {
6375                        sql_bail!("SIZE not supported for unmanaged clusters");
6376                    }
6377                    if disk.is_some() {
6378                        sql_bail!("DISK not supported for unmanaged clusters");
6379                    }
6380                    if schedule.is_some()
6381                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6382                    {
6383                        sql_bail!(
6384                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6385                        );
6386                    }
6387                    if let Some(current_schedule) = cluster.schedule() {
6388                        if !matches!(current_schedule, ClusterSchedule::Manual)
6389                            && schedule.is_none()
6390                        {
6391                            sql_bail!(
6392                                "when switching a cluster to unmanaged, if the managed \
6393                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6394                                explicitly set the SCHEDULE to MANUAL"
6395                            );
6396                        }
6397                    }
6398                }
6399            }
6400
6401            let mut replicas = vec![];
6402            for ReplicaDefinition { name, options } in
6403                replica_defs.into_iter().flat_map(Vec::into_iter)
6404            {
6405                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6406            }
6407
6408            if let Some(managed) = managed {
6409                options.managed = AlterOptionParameter::Set(managed);
6410            }
6411            if let Some(replication_factor) = replication_factor {
6412                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6413            }
6414            if let Some(size) = &size {
6415                options.size = AlterOptionParameter::Set(size.clone());
6416            }
6417            if let Some(availability_zones) = availability_zones {
6418                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6419            }
6420            if let Some(introspection_debugging) = introspection_debugging {
6421                options.introspection_debugging =
6422                    AlterOptionParameter::Set(introspection_debugging);
6423            }
6424            if let Some(introspection_interval) = introspection_interval {
6425                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6426            }
6427            if disk.is_some() {
6428                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6429                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6430                // we'll be able to remove the `DISK` option entirely.
6431                let size = size.as_deref().unwrap_or_else(|| {
6432                    cluster.managed_size().expect("cluster known to be managed")
6433                });
6434                if scx.catalog.is_cluster_size_cc(size) {
6435                    sql_bail!(
6436                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6437                    );
6438                }
6439
6440                scx.catalog
6441                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6442            }
6443            if !replicas.is_empty() {
6444                options.replicas = AlterOptionParameter::Set(replicas);
6445            }
6446            if let Some(schedule) = schedule {
6447                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6448            }
6449            if let Some(workload_class) = workload_class {
6450                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6451            }
6452        }
6453        AlterClusterAction::ResetOptions(reset_options) => {
6454            use AlterOptionParameter::Reset;
6455            use ClusterOptionName::*;
6456
6457            if !scx.catalog.active_role_id().is_system() {
6458                if reset_options.contains(&WorkloadClass) {
6459                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6460                }
6461            }
6462
6463            for option in reset_options {
6464                match option {
6465                    AvailabilityZones => options.availability_zones = Reset,
6466                    Disk => scx
6467                        .catalog
6468                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6469                    IntrospectionInterval => options.introspection_interval = Reset,
6470                    IntrospectionDebugging => options.introspection_debugging = Reset,
6471                    Managed => options.managed = Reset,
6472                    Replicas => options.replicas = Reset,
6473                    ReplicationFactor => options.replication_factor = Reset,
6474                    Size => options.size = Reset,
6475                    Schedule => options.schedule = Reset,
6476                    WorkloadClass => options.workload_class = Reset,
6477                }
6478            }
6479        }
6480    }
6481    Ok(Plan::AlterCluster(AlterClusterPlan {
6482        id: cluster.id(),
6483        name: cluster.name().to_string(),
6484        options,
6485        strategy: alter_strategy,
6486    }))
6487}
6488
6489pub fn describe_alter_set_cluster(
6490    _: &StatementContext,
6491    _: AlterSetClusterStatement<Aug>,
6492) -> Result<StatementDesc, PlanError> {
6493    Ok(StatementDesc::new(None))
6494}
6495
6496pub fn plan_alter_item_set_cluster(
6497    scx: &StatementContext,
6498    AlterSetClusterStatement {
6499        if_exists,
6500        set_cluster: in_cluster_name,
6501        name,
6502        object_type,
6503    }: AlterSetClusterStatement<Aug>,
6504) -> Result<Plan, PlanError> {
6505    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6506
6507    let object_type = object_type.into();
6508
6509    // Prevent access to `SET CLUSTER` for unsupported objects.
6510    match object_type {
6511        ObjectType::MaterializedView => {}
6512        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6513            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6514        }
6515        _ => {
6516            bail_never_supported!(
6517                format!("ALTER {object_type} SET CLUSTER"),
6518                "sql/alter-set-cluster/",
6519                format!("{object_type} has no associated cluster")
6520            )
6521        }
6522    }
6523
6524    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6525
6526    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6527        Some(entry) => {
6528            let current_cluster = entry.cluster_id();
6529            let Some(current_cluster) = current_cluster else {
6530                sql_bail!("No cluster associated with {name}");
6531            };
6532
6533            if current_cluster == in_cluster.id() {
6534                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6535            } else {
6536                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6537                    id: entry.id(),
6538                    set_cluster: in_cluster.id(),
6539                }))
6540            }
6541        }
6542        None => {
6543            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6544                name: name.to_ast_string_simple(),
6545                object_type,
6546            });
6547
6548            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6549        }
6550    }
6551}
6552
6553pub fn describe_alter_object_rename(
6554    _: &StatementContext,
6555    _: AlterObjectRenameStatement,
6556) -> Result<StatementDesc, PlanError> {
6557    Ok(StatementDesc::new(None))
6558}
6559
6560pub fn plan_alter_object_rename(
6561    scx: &mut StatementContext,
6562    AlterObjectRenameStatement {
6563        name,
6564        object_type,
6565        to_item_name,
6566        if_exists,
6567    }: AlterObjectRenameStatement,
6568) -> Result<Plan, PlanError> {
6569    let object_type = object_type.into();
6570    match (object_type, name) {
6571        (
6572            ObjectType::View
6573            | ObjectType::MaterializedView
6574            | ObjectType::Table
6575            | ObjectType::Source
6576            | ObjectType::Index
6577            | ObjectType::Sink
6578            | ObjectType::Secret
6579            | ObjectType::Connection,
6580            UnresolvedObjectName::Item(name),
6581        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6582        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6583            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6584        }
6585        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6586            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6587        }
6588        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6589            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6590        }
6591        (object_type, name) => {
6592            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6593        }
6594    }
6595}
6596
6597pub fn plan_alter_schema_rename(
6598    scx: &mut StatementContext,
6599    name: UnresolvedSchemaName,
6600    to_schema_name: Ident,
6601    if_exists: bool,
6602) -> Result<Plan, PlanError> {
6603    // Special case for mz_temp: with lazy temporary schema creation, the temp
6604    // schema may not exist yet, but we still need to return the correct error.
6605    // Check the schema name directly against MZ_TEMP_SCHEMA.
6606    let normalized = normalize::unresolved_schema_name(name.clone())?;
6607    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6608        sql_bail!(
6609            "cannot rename schemas in the ambient database: {:?}",
6610            mz_repr::namespaces::MZ_TEMP_SCHEMA
6611        );
6612    }
6613
6614    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6615        let object_type = ObjectType::Schema;
6616        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6617            name: name.to_ast_string_simple(),
6618            object_type,
6619        });
6620        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6621    };
6622
6623    // Make sure the name is unique.
6624    if scx
6625        .resolve_schema_in_database(&db_spec, &to_schema_name)
6626        .is_ok()
6627    {
6628        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6629            to_schema_name.clone().into_string(),
6630        )));
6631    }
6632
6633    // Prevent users from renaming system related schemas.
6634    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6635    if schema.id().is_system() {
6636        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6637    }
6638
6639    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6640        cur_schema_spec: (db_spec, schema_spec),
6641        new_schema_name: to_schema_name.into_string(),
6642    }))
6643}
6644
6645pub fn plan_alter_schema_swap<F>(
6646    scx: &mut StatementContext,
6647    name_a: UnresolvedSchemaName,
6648    name_b: Ident,
6649    gen_temp_suffix: F,
6650) -> Result<Plan, PlanError>
6651where
6652    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6653{
6654    // Special case for mz_temp: with lazy temporary schema creation, the temp
6655    // schema may not exist yet, but we still need to return the correct error.
6656    // Check the schema name directly against MZ_TEMP_SCHEMA.
6657    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6658    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6659    {
6660        sql_bail!("cannot swap schemas that are in the ambient database");
6661    }
6662    // Also check name_b (the target schema name)
6663    let name_b_str = normalize::ident_ref(&name_b);
6664    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6665        sql_bail!("cannot swap schemas that are in the ambient database");
6666    }
6667
6668    let schema_a = scx.resolve_schema(name_a.clone())?;
6669
6670    let db_spec = schema_a.database().clone();
6671    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6672        sql_bail!("cannot swap schemas that are in the ambient database");
6673    };
6674    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6675
6676    // We cannot swap system schemas.
6677    if schema_a.id().is_system() || schema_b.id().is_system() {
6678        bail_never_supported!("swapping a system schema".to_string())
6679    }
6680
6681    // Generate a temporary name we can swap schema_a to.
6682    //
6683    // 'check' returns if the temp schema name would be valid.
6684    let check = |temp_suffix: &str| {
6685        let mut temp_name = ident!("mz_schema_swap_");
6686        temp_name.append_lossy(temp_suffix);
6687        scx.resolve_schema_in_database(&db_spec, &temp_name)
6688            .is_err()
6689    };
6690    let temp_suffix = gen_temp_suffix(&check)?;
6691    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6692
6693    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6694        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6695        schema_a_name: schema_a.name().schema.to_string(),
6696        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6697        schema_b_name: schema_b.name().schema.to_string(),
6698        name_temp,
6699    }))
6700}
6701
6702pub fn plan_alter_item_rename(
6703    scx: &mut StatementContext,
6704    object_type: ObjectType,
6705    name: UnresolvedItemName,
6706    to_item_name: Ident,
6707    if_exists: bool,
6708) -> Result<Plan, PlanError> {
6709    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6710        Ok(r) => r,
6711        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6712        Err(PlanError::MismatchedObjectType {
6713            name,
6714            is_type: ObjectType::MaterializedView,
6715            expected_type: ObjectType::View,
6716        }) => {
6717            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6718        }
6719        e => e?,
6720    };
6721
6722    match resolved {
6723        Some(entry) => {
6724            let full_name = scx.catalog.resolve_full_name(entry.name());
6725            let item_type = entry.item_type();
6726
6727            let proposed_name = QualifiedItemName {
6728                qualifiers: entry.name().qualifiers.clone(),
6729                item: to_item_name.clone().into_string(),
6730            };
6731
6732            // For PostgreSQL compatibility, items and types cannot have
6733            // overlapping names in a variety of situations. See the comment on
6734            // `CatalogItemType::conflicts_with_type` for details.
6735            let conflicting_type_exists;
6736            let conflicting_item_exists;
6737            if item_type == CatalogItemType::Type {
6738                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6739                conflicting_item_exists = scx
6740                    .catalog
6741                    .get_item_by_name(&proposed_name)
6742                    .map(|item| item.item_type().conflicts_with_type())
6743                    .unwrap_or(false);
6744            } else {
6745                conflicting_type_exists = item_type.conflicts_with_type()
6746                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6747                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6748            };
6749            if conflicting_type_exists || conflicting_item_exists {
6750                sql_bail!("catalog item '{}' already exists", to_item_name);
6751            }
6752
6753            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6754                id: entry.id(),
6755                current_full_name: full_name,
6756                to_name: normalize::ident(to_item_name),
6757                object_type,
6758            }))
6759        }
6760        None => {
6761            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6762                name: name.to_ast_string_simple(),
6763                object_type,
6764            });
6765
6766            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6767        }
6768    }
6769}
6770
6771pub fn plan_alter_cluster_rename(
6772    scx: &mut StatementContext,
6773    object_type: ObjectType,
6774    name: Ident,
6775    to_name: Ident,
6776    if_exists: bool,
6777) -> Result<Plan, PlanError> {
6778    match resolve_cluster(scx, &name, if_exists)? {
6779        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6780            id: entry.id(),
6781            name: entry.name().to_string(),
6782            to_name: ident(to_name),
6783        })),
6784        None => {
6785            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6786                name: name.to_ast_string_simple(),
6787                object_type,
6788            });
6789
6790            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6791        }
6792    }
6793}
6794
6795pub fn plan_alter_cluster_swap<F>(
6796    scx: &mut StatementContext,
6797    name_a: Ident,
6798    name_b: Ident,
6799    gen_temp_suffix: F,
6800) -> Result<Plan, PlanError>
6801where
6802    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6803{
6804    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6805    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6806
6807    let check = |temp_suffix: &str| {
6808        let mut temp_name = ident!("mz_schema_swap_");
6809        temp_name.append_lossy(temp_suffix);
6810        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6811            // Temp name does not exist, so we can use it.
6812            Err(CatalogError::UnknownCluster(_)) => true,
6813            // Temp name already exists!
6814            Ok(_) | Err(_) => false,
6815        }
6816    };
6817    let temp_suffix = gen_temp_suffix(&check)?;
6818    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6819
6820    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6821        id_a: cluster_a.id(),
6822        id_b: cluster_b.id(),
6823        name_a: name_a.into_string(),
6824        name_b: name_b.into_string(),
6825        name_temp,
6826    }))
6827}
6828
6829pub fn plan_alter_cluster_replica_rename(
6830    scx: &mut StatementContext,
6831    object_type: ObjectType,
6832    name: QualifiedReplica,
6833    to_item_name: Ident,
6834    if_exists: bool,
6835) -> Result<Plan, PlanError> {
6836    match resolve_cluster_replica(scx, &name, if_exists)? {
6837        Some((cluster, replica)) => {
6838            ensure_cluster_is_not_managed(scx, cluster.id())?;
6839            Ok(Plan::AlterClusterReplicaRename(
6840                AlterClusterReplicaRenamePlan {
6841                    cluster_id: cluster.id(),
6842                    replica_id: replica,
6843                    name: QualifiedReplica {
6844                        cluster: Ident::new(cluster.name())?,
6845                        replica: name.replica,
6846                    },
6847                    to_name: normalize::ident(to_item_name),
6848                },
6849            ))
6850        }
6851        None => {
6852            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6853                name: name.to_ast_string_simple(),
6854                object_type,
6855            });
6856
6857            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6858        }
6859    }
6860}
6861
6862pub fn describe_alter_object_swap(
6863    _: &StatementContext,
6864    _: AlterObjectSwapStatement,
6865) -> Result<StatementDesc, PlanError> {
6866    Ok(StatementDesc::new(None))
6867}
6868
6869pub fn plan_alter_object_swap(
6870    scx: &mut StatementContext,
6871    stmt: AlterObjectSwapStatement,
6872) -> Result<Plan, PlanError> {
6873    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6874
6875    let AlterObjectSwapStatement {
6876        object_type,
6877        name_a,
6878        name_b,
6879    } = stmt;
6880    let object_type = object_type.into();
6881
6882    // We'll try 10 times to generate a temporary suffix.
6883    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6884        let mut attempts = 0;
6885        let name_temp = loop {
6886            attempts += 1;
6887            if attempts > 10 {
6888                tracing::warn!("Unable to generate temp id for swapping");
6889                sql_bail!("unable to swap!");
6890            }
6891
6892            // Call the provided closure to make sure this name is unique!
6893            let short_id = mz_ore::id_gen::temp_id();
6894            if check_fn(&short_id) {
6895                break short_id;
6896            }
6897        };
6898
6899        Ok(name_temp)
6900    };
6901
6902    match (object_type, name_a, name_b) {
6903        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6904            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6905        }
6906        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6907            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6908        }
6909        (object_type, _, _) => Err(PlanError::Unsupported {
6910            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6911            discussion_no: None,
6912        }),
6913    }
6914}
6915
6916pub fn describe_alter_retain_history(
6917    _: &StatementContext,
6918    _: AlterRetainHistoryStatement<Aug>,
6919) -> Result<StatementDesc, PlanError> {
6920    Ok(StatementDesc::new(None))
6921}
6922
6923pub fn plan_alter_retain_history(
6924    scx: &StatementContext,
6925    AlterRetainHistoryStatement {
6926        object_type,
6927        if_exists,
6928        name,
6929        history,
6930    }: AlterRetainHistoryStatement<Aug>,
6931) -> Result<Plan, PlanError> {
6932    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6933}
6934
6935fn alter_retain_history(
6936    scx: &StatementContext,
6937    object_type: ObjectType,
6938    if_exists: bool,
6939    name: UnresolvedObjectName,
6940    history: Option<WithOptionValue<Aug>>,
6941) -> Result<Plan, PlanError> {
6942    let name = match (object_type, name) {
6943        (
6944            // View gets a special error below.
6945            ObjectType::View
6946            | ObjectType::MaterializedView
6947            | ObjectType::Table
6948            | ObjectType::Source
6949            | ObjectType::Index,
6950            UnresolvedObjectName::Item(name),
6951        ) => name,
6952        (object_type, _) => {
6953            sql_bail!("{object_type} does not support RETAIN HISTORY")
6954        }
6955    };
6956    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6957        Some(entry) => {
6958            let full_name = scx.catalog.resolve_full_name(entry.name());
6959            let item_type = entry.item_type();
6960
6961            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6962            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6963                return Err(PlanError::AlterViewOnMaterializedView(
6964                    full_name.to_string(),
6965                ));
6966            } else if object_type == ObjectType::View {
6967                sql_bail!("{object_type} does not support RETAIN HISTORY")
6968            } else if object_type != item_type {
6969                sql_bail!(
6970                    "\"{}\" is a {} not a {}",
6971                    full_name,
6972                    entry.item_type(),
6973                    format!("{object_type}").to_lowercase()
6974                )
6975            }
6976
6977            // Save the original value so we can write it back down in the create_sql catalog item.
6978            let (value, lcw) = match &history {
6979                Some(WithOptionValue::RetainHistoryFor(value)) => {
6980                    let window = OptionalDuration::try_from_value(value.clone())?;
6981                    (Some(value.clone()), window.0)
6982                }
6983                // None is RESET, so use the default CW.
6984                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6985                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6986            };
6987            let window = plan_retain_history(scx, lcw)?;
6988
6989            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6990                id: entry.id(),
6991                value,
6992                window,
6993                object_type,
6994            }))
6995        }
6996        None => {
6997            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6998                name: name.to_ast_string_simple(),
6999                object_type,
7000            });
7001
7002            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7003        }
7004    }
7005}
7006
7007fn alter_source_timestamp_interval(
7008    scx: &StatementContext,
7009    if_exists: bool,
7010    source_name: UnresolvedItemName,
7011    value: Option<WithOptionValue<Aug>>,
7012) -> Result<Plan, PlanError> {
7013    let object_type = ObjectType::Source;
7014    match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
7015        Some(entry) => {
7016            let full_name = scx.catalog.resolve_full_name(entry.name());
7017            if entry.item_type() != CatalogItemType::Source {
7018                sql_bail!(
7019                    "\"{}\" is a {} not a {}",
7020                    full_name,
7021                    entry.item_type(),
7022                    format!("{object_type}").to_lowercase()
7023                )
7024            }
7025
7026            match value {
7027                Some(val) => {
7028                    let val = match val {
7029                        WithOptionValue::Value(v) => v,
7030                        _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
7031                    };
7032                    let duration = Duration::try_from_value(val.clone())?;
7033
7034                    let min = scx.catalog.system_vars().min_timestamp_interval();
7035                    let max = scx.catalog.system_vars().max_timestamp_interval();
7036                    if duration < min || duration > max {
7037                        return Err(PlanError::InvalidTimestampInterval {
7038                            min,
7039                            max,
7040                            requested: duration,
7041                        });
7042                    }
7043
7044                    Ok(Plan::AlterSourceTimestampInterval(
7045                        AlterSourceTimestampIntervalPlan {
7046                            id: entry.id(),
7047                            value: Some(val),
7048                            interval: duration,
7049                        },
7050                    ))
7051                }
7052                None => {
7053                    let interval = scx.catalog.system_vars().default_timestamp_interval();
7054                    Ok(Plan::AlterSourceTimestampInterval(
7055                        AlterSourceTimestampIntervalPlan {
7056                            id: entry.id(),
7057                            value: None,
7058                            interval,
7059                        },
7060                    ))
7061                }
7062            }
7063        }
7064        None => {
7065            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7066                name: source_name.to_ast_string_simple(),
7067                object_type,
7068            });
7069
7070            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7071        }
7072    }
7073}
7074
7075pub fn describe_alter_secret_options(
7076    _: &StatementContext,
7077    _: AlterSecretStatement<Aug>,
7078) -> Result<StatementDesc, PlanError> {
7079    Ok(StatementDesc::new(None))
7080}
7081
7082pub fn plan_alter_secret(
7083    scx: &mut StatementContext,
7084    stmt: AlterSecretStatement<Aug>,
7085) -> Result<Plan, PlanError> {
7086    let AlterSecretStatement {
7087        name,
7088        if_exists,
7089        value,
7090    } = stmt;
7091    let object_type = ObjectType::Secret;
7092    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7093        Some(entry) => entry.id(),
7094        None => {
7095            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7096                name: name.to_string(),
7097                object_type,
7098            });
7099
7100            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7101        }
7102    };
7103
7104    let secret_as = query::plan_secret_as(scx, value)?;
7105
7106    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7107}
7108
7109pub fn describe_alter_connection(
7110    _: &StatementContext,
7111    _: AlterConnectionStatement<Aug>,
7112) -> Result<StatementDesc, PlanError> {
7113    Ok(StatementDesc::new(None))
7114}
7115
7116generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7117
7118pub fn plan_alter_connection(
7119    scx: &StatementContext,
7120    stmt: AlterConnectionStatement<Aug>,
7121) -> Result<Plan, PlanError> {
7122    let AlterConnectionStatement {
7123        name,
7124        if_exists,
7125        actions,
7126        with_options,
7127    } = stmt;
7128    let conn_name = normalize::unresolved_item_name(name)?;
7129    let entry = match scx.catalog.resolve_item(&conn_name) {
7130        Ok(entry) => entry,
7131        Err(_) if if_exists => {
7132            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7133                name: conn_name.to_string(),
7134                object_type: ObjectType::Sink,
7135            });
7136
7137            return Ok(Plan::AlterNoop(AlterNoopPlan {
7138                object_type: ObjectType::Connection,
7139            }));
7140        }
7141        Err(e) => return Err(e.into()),
7142    };
7143
7144    let connection = entry.connection()?;
7145
7146    if actions
7147        .iter()
7148        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7149    {
7150        if actions.len() > 1 {
7151            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7152        }
7153
7154        if !with_options.is_empty() {
7155            sql_bail!(
7156                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7157                with_options
7158                    .iter()
7159                    .map(|o| o.to_ast_string_simple())
7160                    .join(", ")
7161            );
7162        }
7163
7164        if !matches!(connection, Connection::Ssh(_)) {
7165            sql_bail!(
7166                "{} is not an SSH connection",
7167                scx.catalog.resolve_full_name(entry.name())
7168            )
7169        }
7170
7171        return Ok(Plan::AlterConnection(AlterConnectionPlan {
7172            id: entry.id(),
7173            action: crate::plan::AlterConnectionAction::RotateKeys,
7174        }));
7175    }
7176
7177    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7178    if options.validate.is_some() {
7179        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7180    }
7181
7182    let validate = match options.validate {
7183        Some(val) => val,
7184        None => {
7185            scx.catalog
7186                .system_vars()
7187                .enable_default_connection_validation()
7188                && connection.validate_by_default()
7189        }
7190    };
7191
7192    let connection_type = match connection {
7193        Connection::Aws(_) => CreateConnectionType::Aws,
7194        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7195        Connection::Kafka(_) => CreateConnectionType::Kafka,
7196        Connection::Csr(_) => CreateConnectionType::Csr,
7197        Connection::Postgres(_) => CreateConnectionType::Postgres,
7198        Connection::Ssh(_) => CreateConnectionType::Ssh,
7199        Connection::MySql(_) => CreateConnectionType::MySql,
7200        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7201        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7202    };
7203
7204    // Collect all options irrespective of action taken on them.
7205    let specified_options: BTreeSet<_> = actions
7206        .iter()
7207        .map(|action: &AlterConnectionAction<Aug>| match action {
7208            AlterConnectionAction::SetOption(option) => option.name.clone(),
7209            AlterConnectionAction::DropOption(name) => name.clone(),
7210            AlterConnectionAction::RotateKeys => unreachable!(),
7211        })
7212        .collect();
7213
7214    for invalid in INALTERABLE_OPTIONS {
7215        if specified_options.contains(invalid) {
7216            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7217        }
7218    }
7219
7220    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7221
7222    // Partition operations into set and drop
7223    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7224        actions.into_iter().partition_map(|action| match action {
7225            AlterConnectionAction::SetOption(option) => Either::Left(option),
7226            AlterConnectionAction::DropOption(name) => Either::Right(name),
7227            AlterConnectionAction::RotateKeys => unreachable!(),
7228        });
7229
7230    let set_options: BTreeMap<_, _> = set_options_vec
7231        .clone()
7232        .into_iter()
7233        .map(|option| (option.name, option.value))
7234        .collect();
7235
7236    // Type check values + avoid duplicates; we don't want to e.g. let users
7237    // drop and set the same option in the same statement, so treating drops as
7238    // sets here is fine.
7239    let connection_options_extracted =
7240        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7241
7242    let duplicates: Vec<_> = connection_options_extracted
7243        .seen
7244        .intersection(&drop_options)
7245        .collect();
7246
7247    if !duplicates.is_empty() {
7248        sql_bail!(
7249            "cannot both SET and DROP/RESET options {}",
7250            duplicates
7251                .iter()
7252                .map(|option| option.to_string())
7253                .join(", ")
7254        )
7255    }
7256
7257    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7258        let set_options_count = mutually_exclusive_options
7259            .iter()
7260            .filter(|o| set_options.contains_key(o))
7261            .count();
7262        let drop_options_count = mutually_exclusive_options
7263            .iter()
7264            .filter(|o| drop_options.contains(o))
7265            .count();
7266
7267        // Disallow setting _and_ resetting mutually exclusive options
7268        if set_options_count > 0 && drop_options_count > 0 {
7269            sql_bail!(
7270                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7271                connection_type,
7272                mutually_exclusive_options
7273                    .iter()
7274                    .map(|option| option.to_string())
7275                    .join(", ")
7276            )
7277        }
7278
7279        // If any option is either set or dropped, ensure all mutually exclusive
7280        // options are dropped. We do this "behind the scenes", even though we
7281        // disallow users from performing the same action because this is the
7282        // mechanism by which we overwrite values elsewhere in the code.
7283        if set_options_count > 0 || drop_options_count > 0 {
7284            drop_options.extend(mutually_exclusive_options.iter().cloned());
7285        }
7286
7287        // n.b. if mutually exclusive options are set, those will error when we
7288        // try to replan the connection.
7289    }
7290
7291    Ok(Plan::AlterConnection(AlterConnectionPlan {
7292        id: entry.id(),
7293        action: crate::plan::AlterConnectionAction::AlterOptions {
7294            set_options,
7295            drop_options,
7296            validate,
7297        },
7298    }))
7299}
7300
7301pub fn describe_alter_sink(
7302    _: &StatementContext,
7303    _: AlterSinkStatement<Aug>,
7304) -> Result<StatementDesc, PlanError> {
7305    Ok(StatementDesc::new(None))
7306}
7307
7308pub fn plan_alter_sink(
7309    scx: &mut StatementContext,
7310    stmt: AlterSinkStatement<Aug>,
7311) -> Result<Plan, PlanError> {
7312    let AlterSinkStatement {
7313        sink_name,
7314        if_exists,
7315        action,
7316    } = stmt;
7317
7318    let object_type = ObjectType::Sink;
7319    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7320
7321    let Some(item) = item else {
7322        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7323            name: sink_name.to_string(),
7324            object_type,
7325        });
7326
7327        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7328    };
7329    // Always ALTER objects from their latest version.
7330    let item = item.at_version(RelationVersionSelector::Latest);
7331
7332    match action {
7333        AlterSinkAction::ChangeRelation(new_from) => {
7334            // First we reconstruct the original CREATE SINK statement
7335            let create_sql = item.create_sql();
7336            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7337            let [stmt]: [StatementParseResult; 1] = stmts
7338                .try_into()
7339                .expect("create sql of sink was not exactly one statement");
7340            let Statement::CreateSink(stmt) = stmt.ast else {
7341                unreachable!("invalid create SQL for sink item");
7342            };
7343
7344            // Then resolve and swap the resolved from relation to the new one
7345            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7346            stmt.from = new_from;
7347
7348            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7349            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7350                unreachable!("invalid plan for CREATE SINK statement");
7351            };
7352
7353            plan.sink.version += 1;
7354
7355            Ok(Plan::AlterSink(AlterSinkPlan {
7356                item_id: item.id(),
7357                global_id: item.global_id(),
7358                sink: plan.sink,
7359                with_snapshot: plan.with_snapshot,
7360                in_cluster: plan.in_cluster,
7361            }))
7362        }
7363        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7364        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7365    }
7366}
7367
7368pub fn describe_alter_source(
7369    _: &StatementContext,
7370    _: AlterSourceStatement<Aug>,
7371) -> Result<StatementDesc, PlanError> {
7372    // TODO: put the options here, right?
7373    Ok(StatementDesc::new(None))
7374}
7375
7376generate_extracted_config!(
7377    AlterSourceAddSubsourceOption,
7378    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7379    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7380    (Details, String)
7381);
7382
7383pub fn plan_alter_source(
7384    scx: &mut StatementContext,
7385    stmt: AlterSourceStatement<Aug>,
7386) -> Result<Plan, PlanError> {
7387    let AlterSourceStatement {
7388        source_name,
7389        if_exists,
7390        action,
7391    } = stmt;
7392    let object_type = ObjectType::Source;
7393
7394    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7395        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7396            name: source_name.to_string(),
7397            object_type,
7398        });
7399
7400        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7401    }
7402
7403    match action {
7404        AlterSourceAction::SetOptions(options) => {
7405            let mut options = options.into_iter();
7406            let option = options.next().unwrap();
7407            if option.name == CreateSourceOptionName::RetainHistory {
7408                if options.next().is_some() {
7409                    sql_bail!("RETAIN HISTORY must be only option");
7410                }
7411                return alter_retain_history(
7412                    scx,
7413                    object_type,
7414                    if_exists,
7415                    UnresolvedObjectName::Item(source_name),
7416                    option.value,
7417                );
7418            }
7419            if option.name == CreateSourceOptionName::TimestampInterval {
7420                if options.next().is_some() {
7421                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7422                }
7423                return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7424            }
7425            // n.b we use this statement in purification in a way that cannot be
7426            // planned directly.
7427            sql_bail!(
7428                "Cannot modify the {} of a SOURCE.",
7429                option.name.to_ast_string_simple()
7430            );
7431        }
7432        AlterSourceAction::ResetOptions(reset) => {
7433            let mut options = reset.into_iter();
7434            let option = options.next().unwrap();
7435            if option == CreateSourceOptionName::RetainHistory {
7436                if options.next().is_some() {
7437                    sql_bail!("RETAIN HISTORY must be only option");
7438                }
7439                return alter_retain_history(
7440                    scx,
7441                    object_type,
7442                    if_exists,
7443                    UnresolvedObjectName::Item(source_name),
7444                    None,
7445                );
7446            }
7447            if option == CreateSourceOptionName::TimestampInterval {
7448                if options.next().is_some() {
7449                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7450                }
7451                return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7452            }
7453            sql_bail!(
7454                "Cannot modify the {} of a SOURCE.",
7455                option.to_ast_string_simple()
7456            );
7457        }
7458        AlterSourceAction::DropSubsources { .. } => {
7459            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7460        }
7461        AlterSourceAction::AddSubsources { .. } => {
7462            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7463        }
7464        AlterSourceAction::RefreshReferences => {
7465            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7466        }
7467    };
7468}
7469
7470pub fn describe_alter_system_set(
7471    _: &StatementContext,
7472    _: AlterSystemSetStatement,
7473) -> Result<StatementDesc, PlanError> {
7474    Ok(StatementDesc::new(None))
7475}
7476
7477pub fn plan_alter_system_set(
7478    _: &StatementContext,
7479    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7480) -> Result<Plan, PlanError> {
7481    let name = name.to_string();
7482    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7483        name,
7484        value: scl::plan_set_variable_to(to)?,
7485    }))
7486}
7487
7488pub fn describe_alter_system_reset(
7489    _: &StatementContext,
7490    _: AlterSystemResetStatement,
7491) -> Result<StatementDesc, PlanError> {
7492    Ok(StatementDesc::new(None))
7493}
7494
7495pub fn plan_alter_system_reset(
7496    _: &StatementContext,
7497    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7498) -> Result<Plan, PlanError> {
7499    let name = name.to_string();
7500    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7501}
7502
7503pub fn describe_alter_system_reset_all(
7504    _: &StatementContext,
7505    _: AlterSystemResetAllStatement,
7506) -> Result<StatementDesc, PlanError> {
7507    Ok(StatementDesc::new(None))
7508}
7509
7510pub fn plan_alter_system_reset_all(
7511    _: &StatementContext,
7512    _: AlterSystemResetAllStatement,
7513) -> Result<Plan, PlanError> {
7514    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7515}
7516
7517pub fn describe_alter_role(
7518    _: &StatementContext,
7519    _: AlterRoleStatement<Aug>,
7520) -> Result<StatementDesc, PlanError> {
7521    Ok(StatementDesc::new(None))
7522}
7523
7524pub fn plan_alter_role(
7525    scx: &StatementContext,
7526    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7527) -> Result<Plan, PlanError> {
7528    let option = match option {
7529        AlterRoleOption::Attributes(attrs) => {
7530            let attrs = plan_role_attributes(attrs, scx)?;
7531            PlannedAlterRoleOption::Attributes(attrs)
7532        }
7533        AlterRoleOption::Variable(variable) => {
7534            let var = plan_role_variable(variable)?;
7535            PlannedAlterRoleOption::Variable(var)
7536        }
7537    };
7538
7539    Ok(Plan::AlterRole(AlterRolePlan {
7540        id: name.id,
7541        name: name.name,
7542        option,
7543    }))
7544}
7545
7546pub fn describe_alter_table_add_column(
7547    _: &StatementContext,
7548    _: AlterTableAddColumnStatement<Aug>,
7549) -> Result<StatementDesc, PlanError> {
7550    Ok(StatementDesc::new(None))
7551}
7552
7553pub fn plan_alter_table_add_column(
7554    scx: &StatementContext,
7555    stmt: AlterTableAddColumnStatement<Aug>,
7556) -> Result<Plan, PlanError> {
7557    let AlterTableAddColumnStatement {
7558        if_exists,
7559        name,
7560        if_col_not_exist,
7561        column_name,
7562        data_type,
7563    } = stmt;
7564    let object_type = ObjectType::Table;
7565
7566    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7567
7568    let (relation_id, item_name, desc) =
7569        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7570            Some(item) => {
7571                // Always add columns to the latest version of the item.
7572                let item_name = scx.catalog.resolve_full_name(item.name());
7573                let item = item.at_version(RelationVersionSelector::Latest);
7574                let desc = item.relation_desc().expect("table has desc").into_owned();
7575                (item.id(), item_name, desc)
7576            }
7577            None => {
7578                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7579                    name: name.to_ast_string_simple(),
7580                    object_type,
7581                });
7582                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7583            }
7584        };
7585
7586    let column_name = ColumnName::from(column_name.as_str());
7587    if desc.get_by_name(&column_name).is_some() {
7588        if if_col_not_exist {
7589            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7590                column_name: column_name.to_string(),
7591                object_name: item_name.item,
7592            });
7593            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7594        } else {
7595            return Err(PlanError::ColumnAlreadyExists {
7596                column_name,
7597                object_name: item_name.item,
7598            });
7599        }
7600    }
7601
7602    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7603    // TODO(alter_table): Support non-nullable columns with default values.
7604    let column_type = scalar_type.nullable(true);
7605    // "unresolve" our data type so we can later update the persisted create_sql.
7606    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7607
7608    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7609        relation_id,
7610        column_name,
7611        column_type,
7612        raw_sql_type,
7613    }))
7614}
7615
7616pub fn describe_alter_materialized_view_apply_replacement(
7617    _: &StatementContext,
7618    _: AlterMaterializedViewApplyReplacementStatement,
7619) -> Result<StatementDesc, PlanError> {
7620    Ok(StatementDesc::new(None))
7621}
7622
7623pub fn plan_alter_materialized_view_apply_replacement(
7624    scx: &StatementContext,
7625    stmt: AlterMaterializedViewApplyReplacementStatement,
7626) -> Result<Plan, PlanError> {
7627    let AlterMaterializedViewApplyReplacementStatement {
7628        if_exists,
7629        name,
7630        replacement_name,
7631    } = stmt;
7632
7633    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7634
7635    let object_type = ObjectType::MaterializedView;
7636    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7637        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7638            name: name.to_ast_string_simple(),
7639            object_type,
7640        });
7641        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7642    };
7643
7644    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7645        .expect("if_exists not set");
7646
7647    if replacement.replacement_target() != Some(mv.id()) {
7648        return Err(PlanError::InvalidReplacement {
7649            item_type: mv.item_type(),
7650            item_name: scx.catalog.minimal_qualification(mv.name()),
7651            replacement_type: replacement.item_type(),
7652            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7653        });
7654    }
7655
7656    Ok(Plan::AlterMaterializedViewApplyReplacement(
7657        AlterMaterializedViewApplyReplacementPlan {
7658            id: mv.id(),
7659            replacement_id: replacement.id(),
7660        },
7661    ))
7662}
7663
7664pub fn describe_comment(
7665    _: &StatementContext,
7666    _: CommentStatement<Aug>,
7667) -> Result<StatementDesc, PlanError> {
7668    Ok(StatementDesc::new(None))
7669}
7670
7671pub fn plan_comment(
7672    scx: &mut StatementContext,
7673    stmt: CommentStatement<Aug>,
7674) -> Result<Plan, PlanError> {
7675    const MAX_COMMENT_LENGTH: usize = 1024;
7676
7677    let CommentStatement { object, comment } = stmt;
7678
7679    // TODO(parkmycar): Make max comment length configurable.
7680    if let Some(c) = &comment {
7681        if c.len() > 1024 {
7682            return Err(PlanError::CommentTooLong {
7683                length: c.len(),
7684                max_size: MAX_COMMENT_LENGTH,
7685            });
7686        }
7687    }
7688
7689    let (object_id, column_pos) = match &object {
7690        com_ty @ CommentObjectType::Table { name }
7691        | com_ty @ CommentObjectType::View { name }
7692        | com_ty @ CommentObjectType::MaterializedView { name }
7693        | com_ty @ CommentObjectType::Index { name }
7694        | com_ty @ CommentObjectType::Func { name }
7695        | com_ty @ CommentObjectType::Connection { name }
7696        | com_ty @ CommentObjectType::Source { name }
7697        | com_ty @ CommentObjectType::Sink { name }
7698        | com_ty @ CommentObjectType::Secret { name }
7699        | com_ty @ CommentObjectType::ContinualTask { name } => {
7700            let item = scx.get_item_by_resolved_name(name)?;
7701            match (com_ty, item.item_type()) {
7702                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7703                    (CommentObjectId::Table(item.id()), None)
7704                }
7705                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7706                    (CommentObjectId::View(item.id()), None)
7707                }
7708                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7709                    (CommentObjectId::MaterializedView(item.id()), None)
7710                }
7711                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7712                    (CommentObjectId::Index(item.id()), None)
7713                }
7714                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7715                    (CommentObjectId::Func(item.id()), None)
7716                }
7717                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7718                    (CommentObjectId::Connection(item.id()), None)
7719                }
7720                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7721                    (CommentObjectId::Source(item.id()), None)
7722                }
7723                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7724                    (CommentObjectId::Sink(item.id()), None)
7725                }
7726                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7727                    (CommentObjectId::Secret(item.id()), None)
7728                }
7729                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7730                    (CommentObjectId::ContinualTask(item.id()), None)
7731                }
7732                (com_ty, cat_ty) => {
7733                    let expected_type = match com_ty {
7734                        CommentObjectType::Table { .. } => ObjectType::Table,
7735                        CommentObjectType::View { .. } => ObjectType::View,
7736                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7737                        CommentObjectType::Index { .. } => ObjectType::Index,
7738                        CommentObjectType::Func { .. } => ObjectType::Func,
7739                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7740                        CommentObjectType::Source { .. } => ObjectType::Source,
7741                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7742                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7743                        _ => unreachable!("these are the only types we match on"),
7744                    };
7745
7746                    return Err(PlanError::InvalidObjectType {
7747                        expected_type: SystemObjectType::Object(expected_type),
7748                        actual_type: SystemObjectType::Object(cat_ty.into()),
7749                        object_name: item.name().item.clone(),
7750                    });
7751                }
7752            }
7753        }
7754        CommentObjectType::Type { ty } => match ty {
7755            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7756                sql_bail!("cannot comment on anonymous list or map type");
7757            }
7758            ResolvedDataType::Named { id, modifiers, .. } => {
7759                if !modifiers.is_empty() {
7760                    sql_bail!("cannot comment on type with modifiers");
7761                }
7762                (CommentObjectId::Type(*id), None)
7763            }
7764            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7765        },
7766        CommentObjectType::Column { name } => {
7767            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7768            match item.item_type() {
7769                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7770                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7771                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7772                CatalogItemType::MaterializedView => {
7773                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7774                }
7775                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7776                r => {
7777                    return Err(PlanError::Unsupported {
7778                        feature: format!("Specifying comments on a column of {r}"),
7779                        discussion_no: None,
7780                    });
7781                }
7782            }
7783        }
7784        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7785        CommentObjectType::Database { name } => {
7786            (CommentObjectId::Database(*name.database_id()), None)
7787        }
7788        CommentObjectType::Schema { name } => {
7789            // Temporary schemas cannot have comments - they are connection-specific
7790            // and transient. With lazy temporary schema creation, the temp schema
7791            // may not exist yet, but we still need to return the correct error.
7792            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7793                sql_bail!(
7794                    "cannot comment on schema {} because it is a temporary schema",
7795                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7796                );
7797            }
7798            (
7799                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7800                None,
7801            )
7802        }
7803        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7804        CommentObjectType::ClusterReplica { name } => {
7805            let replica = scx.catalog.resolve_cluster_replica(name)?;
7806            (
7807                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7808                None,
7809            )
7810        }
7811        CommentObjectType::NetworkPolicy { name } => {
7812            (CommentObjectId::NetworkPolicy(name.id), None)
7813        }
7814    };
7815
7816    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7817    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7818    // it's the easiest place to raise an error.
7819    //
7820    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7821    if let Some(p) = column_pos {
7822        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7823            max_num_columns: MAX_NUM_COLUMNS,
7824            req_num_columns: p,
7825        })?;
7826    }
7827
7828    Ok(Plan::Comment(CommentPlan {
7829        object_id,
7830        sub_component: column_pos,
7831        comment,
7832    }))
7833}
7834
7835pub(crate) fn resolve_cluster<'a>(
7836    scx: &'a StatementContext,
7837    name: &'a Ident,
7838    if_exists: bool,
7839) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7840    match scx.resolve_cluster(Some(name)) {
7841        Ok(cluster) => Ok(Some(cluster)),
7842        Err(_) if if_exists => Ok(None),
7843        Err(e) => Err(e),
7844    }
7845}
7846
7847pub(crate) fn resolve_cluster_replica<'a>(
7848    scx: &'a StatementContext,
7849    name: &QualifiedReplica,
7850    if_exists: bool,
7851) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7852    match scx.resolve_cluster(Some(&name.cluster)) {
7853        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7854            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7855            None if if_exists => Ok(None),
7856            None => Err(sql_err!(
7857                "CLUSTER {} has no CLUSTER REPLICA named {}",
7858                cluster.name(),
7859                name.replica.as_str().quoted(),
7860            )),
7861        },
7862        Err(_) if if_exists => Ok(None),
7863        Err(e) => Err(e),
7864    }
7865}
7866
7867pub(crate) fn resolve_database<'a>(
7868    scx: &'a StatementContext,
7869    name: &'a UnresolvedDatabaseName,
7870    if_exists: bool,
7871) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7872    match scx.resolve_database(name) {
7873        Ok(database) => Ok(Some(database)),
7874        Err(_) if if_exists => Ok(None),
7875        Err(e) => Err(e),
7876    }
7877}
7878
7879pub(crate) fn resolve_schema<'a>(
7880    scx: &'a StatementContext,
7881    name: UnresolvedSchemaName,
7882    if_exists: bool,
7883) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7884    match scx.resolve_schema(name) {
7885        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7886        Err(_) if if_exists => Ok(None),
7887        Err(e) => Err(e),
7888    }
7889}
7890
7891pub(crate) fn resolve_network_policy<'a>(
7892    scx: &'a StatementContext,
7893    name: Ident,
7894    if_exists: bool,
7895) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7896    match scx.catalog.resolve_network_policy(&name.to_string()) {
7897        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7898            id: policy.id(),
7899            name: policy.name().to_string(),
7900        })),
7901        Err(_) if if_exists => Ok(None),
7902        Err(e) => Err(e.into()),
7903    }
7904}
7905
7906pub(crate) fn resolve_item_or_type<'a>(
7907    scx: &'a StatementContext,
7908    object_type: ObjectType,
7909    name: UnresolvedItemName,
7910    if_exists: bool,
7911) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7912    let name = normalize::unresolved_item_name(name)?;
7913    let catalog_item = match object_type {
7914        ObjectType::Type => scx.catalog.resolve_type(&name),
7915        _ => scx.catalog.resolve_item(&name),
7916    };
7917
7918    match catalog_item {
7919        Ok(item) => {
7920            let is_type = ObjectType::from(item.item_type());
7921            if object_type == is_type {
7922                Ok(Some(item))
7923            } else {
7924                Err(PlanError::MismatchedObjectType {
7925                    name: scx.catalog.minimal_qualification(item.name()),
7926                    is_type,
7927                    expected_type: object_type,
7928                })
7929            }
7930        }
7931        Err(_) if if_exists => Ok(None),
7932        Err(e) => Err(e.into()),
7933    }
7934}
7935
7936/// Returns an error if the given cluster is a managed cluster
7937fn ensure_cluster_is_not_managed(
7938    scx: &StatementContext,
7939    cluster_id: ClusterId,
7940) -> Result<(), PlanError> {
7941    let cluster = scx.catalog.get_cluster(cluster_id);
7942    if cluster.is_managed() {
7943        Err(PlanError::ManagedCluster {
7944            cluster_name: cluster.name().to_string(),
7945        })
7946    } else {
7947        Ok(())
7948    }
7949}