mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
49    AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
50    AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
51    AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
52    AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
53    AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
54    ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
55    ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
56    ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
57    ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
58    ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
59    CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
60    CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
61    CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
62    CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
63    CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
64    CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
65    CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
66    CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
67    CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
68    CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
69    CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
70    CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
71    DropOwnedStatement, Expr, Format, FormatSpecifier, IcebergSinkConfigOption, Ident,
72    IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint,
73    LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption,
74    MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, NetworkPolicyOption,
75    NetworkPolicyOptionName, NetworkPolicyRuleDefinition, NetworkPolicyRuleOption,
76    NetworkPolicyRuleOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema,
77    QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue,
78    ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar,
79    SourceErrorPolicy, SourceIncludeMetadata, SqlServerConfigOption, SqlServerConfigOptionName,
80    Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption,
81    TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName,
82    UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
83    WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141    scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
152    AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
153    AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
154    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
155    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
156    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
157    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
158    CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
159    CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
160    CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
161    MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
162    PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
163    Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
164    WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
165    transform_ast,
166};
167use crate::session::vars::{
168    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184/// Given a relation desc and a column list, checks that:
185/// - the column list is a prefix of the desc;
186/// - all the listed columns are types that have meaningful Persist-level ordering.
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188    if partition_by.len() > desc.len() {
189        tracing::error!(
190            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191            desc.len(),
192            partition_by.len()
193        );
194        partition_by.truncate(desc.len());
195    }
196
197    let desc_prefix = desc.iter().take(partition_by.len());
198    for (idx, ((desc_name, desc_type), partition_name)) in
199        desc_prefix.zip_eq(partition_by).enumerate()
200    {
201        let partition_name = normalize::column_name(partition_name);
202        if *desc_name != partition_name {
203            sql_bail!(
204                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205            );
206        }
207        if !preserves_order(&desc_type.scalar_type) {
208            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209        }
210    }
211    Ok(())
212}
213
214pub fn describe_create_database(
215    _: &StatementContext,
216    _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218    Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222    _: &StatementContext,
223    CreateDatabaseStatement {
224        name,
225        if_not_exists,
226    }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228    Ok(Plan::CreateDatabase(CreateDatabasePlan {
229        name: normalize::ident(name.0),
230        if_not_exists,
231    }))
232}
233
234pub fn describe_create_schema(
235    _: &StatementContext,
236    _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238    Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242    scx: &StatementContext,
243    CreateSchemaStatement {
244        mut name,
245        if_not_exists,
246    }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248    if name.0.len() > 2 {
249        sql_bail!("schema name {} has more than two components", name);
250    }
251    let schema_name = normalize::ident(
252        name.0
253            .pop()
254            .expect("names always have at least one component"),
255    );
256    let database_spec = match name.0.pop() {
257        None => match scx.catalog.active_database() {
258            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259            None => sql_bail!("no database specified and no active database"),
260        },
261        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263            Err(_) => sql_bail!("invalid database {}", n.as_str()),
264        },
265    };
266    Ok(Plan::CreateSchema(CreateSchemaPlan {
267        database_spec,
268        schema_name,
269        if_not_exists,
270    }))
271}
272
273pub fn describe_create_table(
274    _: &StatementContext,
275    _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277    Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281    scx: &StatementContext,
282    stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284    let CreateTableStatement {
285        name,
286        columns,
287        constraints,
288        if_not_exists,
289        temporary,
290        with_options,
291    } = &stmt;
292
293    let names: Vec<_> = columns
294        .iter()
295        .filter(|c| {
296            // This set of `names` is used to create the initial RelationDesc.
297            // Columns that have been added at later versions of the table will
298            // get added further below.
299            let is_versioned = c
300                .options
301                .iter()
302                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303            !is_versioned
304        })
305        .map(|c| normalize::column_name(c.name.clone()))
306        .collect();
307
308    if let Some(dup) = names.iter().duplicates().next() {
309        sql_bail!("column {} specified more than once", dup.quoted());
310    }
311
312    // Build initial relation type that handles declared data types
313    // and NOT NULL constraints.
314    let mut column_types = Vec::with_capacity(columns.len());
315    let mut defaults = Vec::with_capacity(columns.len());
316    let mut changes = BTreeMap::new();
317    let mut keys = Vec::new();
318
319    for (i, c) in columns.into_iter().enumerate() {
320        let aug_data_type = &c.data_type;
321        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322        let mut nullable = true;
323        let mut default = Expr::null();
324        let mut versioned = false;
325        for option in &c.options {
326            match &option.option {
327                ColumnOption::NotNull => nullable = false,
328                ColumnOption::Default(expr) => {
329                    // Ensure expression can be planned and yields the correct
330                    // type.
331                    let mut expr = expr.clone();
332                    transform_ast::transform(scx, &mut expr)?;
333                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
334                    default = expr.clone();
335                }
336                ColumnOption::Unique { is_primary } => {
337                    keys.push(vec![i]);
338                    if *is_primary {
339                        nullable = false;
340                    }
341                }
342                ColumnOption::Versioned { action, version } => {
343                    let version = RelationVersion::from(*version);
344                    versioned = true;
345
346                    let name = normalize::column_name(c.name.clone());
347                    let typ = ty.clone().nullable(nullable);
348
349                    changes.insert(version, (action.clone(), name, typ));
350                }
351                other => {
352                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353                }
354            }
355        }
356        // TODO(alter_table): This assumes all versioned columns are at the
357        // end. This will no longer be true when we support dropping columns.
358        if !versioned {
359            column_types.push(ty.nullable(nullable));
360        }
361        defaults.push(default);
362    }
363
364    let mut seen_primary = false;
365    'c: for constraint in constraints {
366        match constraint {
367            TableConstraint::Unique {
368                name: _,
369                columns,
370                is_primary,
371                nulls_not_distinct,
372            } => {
373                if seen_primary && *is_primary {
374                    sql_bail!(
375                        "multiple primary keys for table {} are not allowed",
376                        name.to_ast_string_stable()
377                    );
378                }
379                seen_primary = *is_primary || seen_primary;
380
381                let mut key = vec![];
382                for column in columns {
383                    let column = normalize::column_name(column.clone());
384                    match names.iter().position(|name| *name == column) {
385                        None => sql_bail!("unknown column in constraint: {}", column),
386                        Some(i) => {
387                            let nullable = &mut column_types[i].nullable;
388                            if *is_primary {
389                                if *nulls_not_distinct {
390                                    sql_bail!(
391                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392                                    );
393                                }
394
395                                *nullable = false;
396                            } else if !(*nulls_not_distinct || !*nullable) {
397                                // Non-primary key unique constraints are only keys if all of their
398                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
399                                break 'c;
400                            }
401
402                            key.push(i);
403                        }
404                    }
405                }
406
407                if *is_primary {
408                    keys.insert(0, key);
409                } else {
410                    keys.push(key);
411                }
412            }
413            TableConstraint::ForeignKey { .. } => {
414                // Foreign key constraints are not presently enforced. We allow
415                // them with feature flags for sqllogictest's sake.
416                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417            }
418            TableConstraint::Check { .. } => {
419                // Check constraints are not presently enforced. We allow them
420                // with feature flags for sqllogictest's sake.
421                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422            }
423        }
424    }
425
426    if !keys.is_empty() {
427        // Unique constraints are not presently enforced. We allow them with feature flags for
428        // sqllogictest's sake.
429        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430    }
431
432    let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434    let temporary = *temporary;
435    let name = if temporary {
436        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437    } else {
438        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    };
440
441    // Check for an object in the catalog with this same name
442    let full_name = scx.catalog.resolve_full_name(&name);
443    let partial_name = PartialItemName::from(full_name.clone());
444    // For PostgreSQL compatibility, we need to prevent creating tables when
445    // there is an existing object *or* type of the same name.
446    if let (false, Ok(item)) = (
447        if_not_exists,
448        scx.catalog.resolve_item_or_type(&partial_name),
449    ) {
450        return Err(PlanError::ItemAlreadyExists {
451            name: full_name.to_string(),
452            item_type: item.item_type(),
453        });
454    }
455
456    let desc = RelationDesc::new(typ, names);
457    let mut desc = VersionedRelationDesc::new(desc);
458    for (version, (_action, name, typ)) in changes.into_iter() {
459        let new_version = desc.add_column(name, typ);
460        if version != new_version {
461            return Err(PlanError::InvalidTable {
462                name: full_name.item,
463            });
464        }
465    }
466
467    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469    // Table options should only consider the original columns, since those
470    // were the only ones in scope when the table was created.
471    //
472    // TODO(alter_table): Will need to reconsider this when we support ALTERing
473    // the PARTITION BY columns.
474    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477    let compaction_window = options.iter().find_map(|o| {
478        #[allow(irrefutable_let_patterns)]
479        if let crate::plan::TableOption::RetainHistory(lcw) = o {
480            Some(lcw.clone())
481        } else {
482            None
483        }
484    });
485
486    let table = Table {
487        create_sql,
488        desc,
489        temporary,
490        compaction_window,
491        data_source: TableDataSource::TableWrites { defaults },
492    };
493    Ok(Plan::CreateTable(CreateTablePlan {
494        name,
495        table,
496        if_not_exists: *if_not_exists,
497    }))
498}
499
500pub fn describe_create_table_from_source(
501    _: &StatementContext,
502    _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504    Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508    _: &StatementContext,
509    _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511    Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515    _: &StatementContext,
516    _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518    Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522    _: &StatementContext,
523    _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525    Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529    CreateSourceOption,
530    (TimestampInterval, Duration),
531    (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535    PgConfigOption,
536    (Details, String),
537    (Publication, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543    MySqlConfigOption,
544    (Details, String),
545    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550    SqlServerConfigOption,
551    (Details, String),
552    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557    scx: &StatementContext,
558    mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560    if stmt.is_table {
561        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562    }
563
564    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
565    // we plan to normalize when we canonicalize the create statement.
566    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567    let enable_multi_replica_sources =
568        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569    if !enable_multi_replica_sources {
570        if in_cluster.replica_ids().len() > 1 {
571            sql_bail!("cannot create webhook source in cluster with more than one replica")
572        }
573    }
574    let create_sql =
575        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577    let CreateWebhookSourceStatement {
578        name,
579        if_not_exists,
580        body_format,
581        include_headers,
582        validate_using,
583        is_table,
584        // We resolved `in_cluster` above, so we want to ignore it here.
585        in_cluster: _,
586    } = stmt;
587
588    let validate_using = validate_using
589        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590        .transpose()?;
591    if let Some(WebhookValidation { expression, .. }) = &validate_using {
592        // If the validation expression doesn't reference any part of the request, then we should
593        // return an error because it's almost definitely wrong.
594        if !expression.contains_column() {
595            return Err(PlanError::WebhookValidationDoesNotUseColumns);
596        }
597        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
598        // allow calls to `now()` because some webhook providers recommend rejecting requests that
599        // are older than a certain threshold.
600        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601            return Err(PlanError::WebhookValidationNonDeterministic);
602        }
603    }
604
605    let body_format = match body_format {
606        Format::Bytes => WebhookBodyFormat::Bytes,
607        Format::Json { array } => WebhookBodyFormat::Json { array },
608        Format::Text => WebhookBodyFormat::Text,
609        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
610        ty => {
611            return Err(PlanError::Unsupported {
612                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613                discussion_no: None,
614            });
615        }
616    };
617
618    let mut column_ty = vec![
619        // Always include the body of the request as the first column.
620        SqlColumnType {
621            scalar_type: SqlScalarType::from(body_format),
622            nullable: false,
623        },
624    ];
625    let mut column_names = vec!["body".to_string()];
626
627    let mut headers = WebhookHeaders::default();
628
629    // Include a `headers` column, possibly filtered.
630    if let Some(filters) = include_headers.column {
631        column_ty.push(SqlColumnType {
632            scalar_type: SqlScalarType::Map {
633                value_type: Box::new(SqlScalarType::String),
634                custom_id: None,
635            },
636            nullable: false,
637        });
638        column_names.push("headers".to_string());
639
640        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641            filters.into_iter().partition_map(|filter| {
642                if filter.block {
643                    itertools::Either::Right(filter.header_name)
644                } else {
645                    itertools::Either::Left(filter.header_name)
646                }
647            });
648        headers.header_column = Some(WebhookHeaderFilters { allow, block });
649    }
650
651    // Map headers to specific columns.
652    for header in include_headers.mappings {
653        let scalar_type = header
654            .use_bytes
655            .then_some(SqlScalarType::Bytes)
656            .unwrap_or(SqlScalarType::String);
657        column_ty.push(SqlColumnType {
658            scalar_type,
659            nullable: true,
660        });
661        column_names.push(header.column_name.into_string());
662
663        let column_idx = column_ty.len() - 1;
664        // Double check we're consistent with column names.
665        assert_eq!(
666            column_idx,
667            column_names.len() - 1,
668            "header column names and types don't match"
669        );
670        headers
671            .mapped_headers
672            .insert(column_idx, (header.header_name, header.use_bytes));
673    }
674
675    // Validate our columns.
676    let mut unique_check = HashSet::with_capacity(column_names.len());
677    for name in &column_names {
678        if !unique_check.insert(name) {
679            return Err(PlanError::AmbiguousColumn(name.clone().into()));
680        }
681    }
682    if column_names.len() > MAX_NUM_COLUMNS {
683        return Err(PlanError::TooManyColumns {
684            max_num_columns: MAX_NUM_COLUMNS,
685            req_num_columns: column_names.len(),
686        });
687    }
688
689    let typ = SqlRelationType::new(column_ty);
690    let desc = RelationDesc::new(typ, column_names);
691
692    // Check for an object in the catalog with this same name
693    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694    let full_name = scx.catalog.resolve_full_name(&name);
695    let partial_name = PartialItemName::from(full_name.clone());
696    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697        return Err(PlanError::ItemAlreadyExists {
698            name: full_name.to_string(),
699            item_type: item.item_type(),
700        });
701    }
702
703    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
704    // such, we always use a default of EpochMilliseconds.
705    let timeline = Timeline::EpochMilliseconds;
706
707    let plan = if is_table {
708        let data_source = DataSourceDesc::Webhook {
709            validate_using,
710            body_format,
711            headers,
712            cluster_id: Some(in_cluster.id()),
713        };
714        let data_source = TableDataSource::DataSource {
715            desc: data_source,
716            timeline,
717        };
718        Plan::CreateTable(CreateTablePlan {
719            name,
720            if_not_exists,
721            table: Table {
722                create_sql,
723                desc: VersionedRelationDesc::new(desc),
724                temporary: false,
725                compaction_window: None,
726                data_source,
727            },
728        })
729    } else {
730        let data_source = DataSourceDesc::Webhook {
731            validate_using,
732            body_format,
733            headers,
734            // Important: The cluster is set at the `Source` level.
735            cluster_id: None,
736        };
737        Plan::CreateSource(CreateSourcePlan {
738            name,
739            source: Source {
740                create_sql,
741                data_source,
742                desc,
743                compaction_window: None,
744            },
745            if_not_exists,
746            timeline,
747            in_cluster: Some(in_cluster.id()),
748        })
749    };
750
751    Ok(plan)
752}
753
754pub fn plan_create_source(
755    scx: &StatementContext,
756    mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758    let CreateSourceStatement {
759        name,
760        in_cluster: _,
761        col_names,
762        connection: source_connection,
763        envelope,
764        if_not_exists,
765        format,
766        key_constraint,
767        include_metadata,
768        with_options,
769        external_references: referenced_subsources,
770        progress_subsource,
771    } = &stmt;
772
773    mz_ore::soft_assert_or_log!(
774        referenced_subsources.is_none(),
775        "referenced subsources must be cleared in purification"
776    );
777
778    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779        && scx.catalog.system_vars().force_source_table_syntax();
780
781    // If the new source table syntax is forced all the options related to the primary
782    // source output should be un-set.
783    if force_source_table_syntax {
784        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785            Err(PlanError::UseTablesForSources(
786                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787            ))?;
788        }
789    }
790
791    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794        && include_metadata
795            .iter()
796            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797    {
798        // TODO(guswynn): should this be `bail_unsupported!`?
799        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800    }
801    if !matches!(
802        source_connection,
803        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804    ) && !include_metadata.is_empty()
805    {
806        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807    }
808
809    if !include_metadata.is_empty()
810        && !matches!(
811            envelope,
812            ast::SourceEnvelope::Upsert { .. }
813                | ast::SourceEnvelope::None
814                | ast::SourceEnvelope::Debezium
815        )
816    {
817        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818    }
819
820    let external_connection =
821        plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823    let CreateSourceOptionExtracted {
824        timestamp_interval,
825        retain_history,
826        seen: _,
827    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829    let metadata_columns_desc = match external_connection {
830        GenericSourceConnection::Kafka(KafkaSourceConnection {
831            ref metadata_columns,
832            ..
833        }) => kafka_metadata_columns_desc(metadata_columns),
834        _ => vec![],
835    };
836
837    // Generate the relation description for the primary export of the source.
838    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839        scx,
840        &envelope,
841        format,
842        Some(external_connection.default_key_desc()),
843        external_connection.default_value_desc(),
844        include_metadata,
845        metadata_columns_desc,
846        &external_connection,
847    )?;
848    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850    let names: Vec<_> = desc.iter_names().cloned().collect();
851    if let Some(dup) = names.iter().duplicates().next() {
852        sql_bail!("column {} specified more than once", dup.quoted());
853    }
854
855    // Apply user-specified key constraint
856    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857        // Don't remove this without addressing
858        // https://github.com/MaterializeInc/database-issues/issues/4371.
859        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861        let key_columns = columns
862            .into_iter()
863            .map(normalize::column_name)
864            .collect::<Vec<_>>();
865
866        let mut uniq = BTreeSet::new();
867        for col in key_columns.iter() {
868            if !uniq.insert(col) {
869                sql_bail!("Repeated column name in source key constraint: {}", col);
870            }
871        }
872
873        let key_indices = key_columns
874            .iter()
875            .map(|col| {
876                let name_idx = desc
877                    .get_by_name(col)
878                    .map(|(idx, _type)| idx)
879                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880                if desc.get_unambiguous_name(name_idx).is_none() {
881                    sql_bail!("Ambiguous column in source key constraint: {}", col);
882                }
883                Ok(name_idx)
884            })
885            .collect::<Result<Vec<_>, _>>()?;
886
887        if !desc.typ().keys.is_empty() {
888            return Err(key_constraint_err(&desc, &key_columns));
889        } else {
890            desc = desc.with_key(key_indices);
891        }
892    }
893
894    let timestamp_interval = match timestamp_interval {
895        Some(duration) => {
896            let min = scx.catalog.system_vars().min_timestamp_interval();
897            let max = scx.catalog.system_vars().max_timestamp_interval();
898            if duration < min || duration > max {
899                return Err(PlanError::InvalidTimestampInterval {
900                    min,
901                    max,
902                    requested: duration,
903                });
904            }
905            duration
906        }
907        None => scx.catalog.config().timestamp_interval,
908    };
909
910    let (desc, data_source) = match progress_subsource {
911        Some(name) => {
912            let DeferredItemName::Named(name) = name else {
913                sql_bail!("[internal error] progress subsource must be named during purification");
914            };
915            let ResolvedItemName::Item { id, .. } = name else {
916                sql_bail!("[internal error] invalid target id");
917            };
918
919            let details = match external_connection {
920                GenericSourceConnection::Kafka(ref c) => {
921                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
922                        metadata_columns: c.metadata_columns.clone(),
923                    })
924                }
925                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926                    LoadGenerator::Auction
927                    | LoadGenerator::Marketing
928                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929                    LoadGenerator::Counter { .. }
930                    | LoadGenerator::Clock
931                    | LoadGenerator::Datums
932                    | LoadGenerator::KeyValue(_) => {
933                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934                            output: LoadGeneratorOutput::Default,
935                        })
936                    }
937                },
938                GenericSourceConnection::Postgres(_)
939                | GenericSourceConnection::MySql(_)
940                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941            };
942
943            let data_source = DataSourceDesc::OldSyntaxIngestion {
944                desc: SourceDesc {
945                    connection: external_connection,
946                    timestamp_interval,
947                },
948                progress_subsource: *id,
949                data_config: SourceExportDataConfig {
950                    encoding,
951                    envelope: envelope.clone(),
952                },
953                details,
954            };
955            (desc, data_source)
956        }
957        None => {
958            let desc = external_connection.timestamp_desc();
959            let data_source = DataSourceDesc::Ingestion(SourceDesc {
960                connection: external_connection,
961                timestamp_interval,
962            });
963            (desc, data_source)
964        }
965    };
966
967    let if_not_exists = *if_not_exists;
968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970    // Check for an object in the catalog with this same name
971    let full_name = scx.catalog.resolve_full_name(&name);
972    let partial_name = PartialItemName::from(full_name.clone());
973    // For PostgreSQL compatibility, we need to prevent creating sources when
974    // there is an existing object *or* type of the same name.
975    if let (false, Ok(item)) = (
976        if_not_exists,
977        scx.catalog.resolve_item_or_type(&partial_name),
978    ) {
979        return Err(PlanError::ItemAlreadyExists {
980            name: full_name.to_string(),
981            item_type: item.item_type(),
982        });
983    }
984
985    // We will rewrite the cluster if one is not provided, so we must use the
986    // `in_cluster` value we plan to normalize when we canonicalize the create
987    // statement.
988    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992    // Determine a default timeline for the source.
993    let timeline = match envelope {
994        SourceEnvelope::CdcV2 => {
995            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996        }
997        _ => Timeline::EpochMilliseconds,
998    };
999
1000    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002    let source = Source {
1003        create_sql,
1004        data_source,
1005        desc,
1006        compaction_window,
1007    };
1008
1009    Ok(Plan::CreateSource(CreateSourcePlan {
1010        name,
1011        source,
1012        if_not_exists,
1013        timeline,
1014        in_cluster: Some(in_cluster.id()),
1015    }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019    scx: &StatementContext<'_>,
1020    source_connection: &CreateSourceConnection<Aug>,
1021    include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023    Ok(match source_connection {
1024        CreateSourceConnection::Kafka {
1025            connection,
1026            options,
1027        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028            scx,
1029            connection,
1030            options,
1031            include_metadata,
1032        )?),
1033        CreateSourceConnection::Postgres {
1034            connection,
1035            options,
1036        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037            scx, connection, options,
1038        )?),
1039        CreateSourceConnection::SqlServer {
1040            connection,
1041            options,
1042        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043            scx, connection, options,
1044        )?),
1045        CreateSourceConnection::MySql {
1046            connection,
1047            options,
1048        } => {
1049            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050        }
1051        CreateSourceConnection::LoadGenerator { generator, options } => {
1052            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053                scx,
1054                generator,
1055                options,
1056                include_metadata,
1057            )?)
1058        }
1059    })
1060}
1061
1062fn plan_load_generator_source_connection(
1063    scx: &StatementContext<'_>,
1064    generator: &ast::LoadGenerator,
1065    options: &Vec<LoadGeneratorOption<Aug>>,
1066    include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068    let load_generator =
1069        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070    let LoadGeneratorOptionExtracted {
1071        tick_interval,
1072        as_of,
1073        up_to,
1074        ..
1075    } = options.clone().try_into()?;
1076    let tick_micros = match tick_interval {
1077        Some(interval) => Some(interval.as_micros().try_into()?),
1078        None => None,
1079    };
1080    if up_to < as_of {
1081        sql_bail!("UP TO cannot be less than AS OF");
1082    }
1083    Ok(LoadGeneratorSourceConnection {
1084        load_generator,
1085        tick_micros,
1086        as_of,
1087        up_to,
1088    })
1089}
1090
1091fn plan_mysql_source_connection(
1092    scx: &StatementContext<'_>,
1093    connection: &ResolvedItemName,
1094    options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096    let connection_item = scx.get_item_by_resolved_name(connection)?;
1097    match connection_item.connection()? {
1098        Connection::MySql(connection) => connection,
1099        _ => sql_bail!(
1100            "{} is not a MySQL connection",
1101            scx.catalog.resolve_full_name(connection_item.name())
1102        ),
1103    };
1104    let MySqlConfigOptionExtracted {
1105        details,
1106        // text/exclude columns are already part of the source-exports and are only included
1107        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1108        // be removed once we drop support for implicitly created subsources.
1109        text_columns: _,
1110        exclude_columns: _,
1111        seen: _,
1112    } = options.clone().try_into()?;
1113    let details = details
1114        .as_ref()
1115        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119    Ok(MySqlSourceConnection {
1120        connection: connection_item.id(),
1121        connection_id: connection_item.id(),
1122        details,
1123    })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127    scx: &StatementContext<'_>,
1128    connection: &ResolvedItemName,
1129    options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131    let connection_item = scx.get_item_by_resolved_name(connection)?;
1132    match connection_item.connection()? {
1133        Connection::SqlServer(connection) => connection,
1134        _ => sql_bail!(
1135            "{} is not a SQL Server connection",
1136            scx.catalog.resolve_full_name(connection_item.name())
1137        ),
1138    };
1139    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140    let details = details
1141        .as_ref()
1142        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143    let extras = hex::decode(details)
1144        .map_err(|e| sql_err!("{e}"))
1145        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147    Ok(SqlServerSourceConnection {
1148        connection_id: connection_item.id(),
1149        connection: connection_item.id(),
1150        extras,
1151    })
1152}
1153
1154fn plan_postgres_source_connection(
1155    scx: &StatementContext<'_>,
1156    connection: &ResolvedItemName,
1157    options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159    let connection_item = scx.get_item_by_resolved_name(connection)?;
1160    let PgConfigOptionExtracted {
1161        details,
1162        publication,
1163        // text columns are already part of the source-exports and are only included
1164        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1165        // be removed once we drop support for implicitly created subsources.
1166        text_columns: _,
1167        // exclude columns are already part of the source-exports and are only included
1168        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1169        // be removed once we drop support for implicitly created subsources.
1170        exclude_columns: _,
1171        seen: _,
1172    } = options.clone().try_into()?;
1173    let details = details
1174        .as_ref()
1175        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177    let details =
1178        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179    let publication_details =
1180        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181    Ok(PostgresSourceConnection {
1182        connection: connection_item.id(),
1183        connection_id: connection_item.id(),
1184        publication: publication.expect("validated exists during purification"),
1185        publication_details,
1186    })
1187}
1188
1189fn plan_kafka_source_connection(
1190    scx: &StatementContext<'_>,
1191    connection_name: &ResolvedItemName,
1192    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193    include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197        sql_bail!(
1198            "{} is not a kafka connection",
1199            scx.catalog.resolve_full_name(connection_item.name())
1200        )
1201    }
1202    let KafkaSourceConfigOptionExtracted {
1203        group_id_prefix,
1204        topic,
1205        topic_metadata_refresh_interval,
1206        start_timestamp: _, // purified into `start_offset`
1207        start_offset,
1208        seen: _,
1209    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210    let topic = topic.expect("validated exists during purification");
1211    let mut start_offsets = BTreeMap::new();
1212    if let Some(offsets) = start_offset {
1213        for (part, offset) in offsets.iter().enumerate() {
1214            if *offset < 0 {
1215                sql_bail!("START OFFSET must be a nonnegative integer");
1216            }
1217            start_offsets.insert(i32::try_from(part)?, *offset);
1218        }
1219    }
1220    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221        // This is a librdkafka-enforced restriction that, if violated,
1222        // would result in a runtime error for the source.
1223        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224    }
1225    let metadata_columns = include_metadata
1226        .into_iter()
1227        .flat_map(|item| match item {
1228            SourceIncludeMetadata::Timestamp { alias } => {
1229                let name = match alias {
1230                    Some(name) => name.to_string(),
1231                    None => "timestamp".to_owned(),
1232                };
1233                Some((name, KafkaMetadataKind::Timestamp))
1234            }
1235            SourceIncludeMetadata::Partition { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "partition".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Partition))
1241            }
1242            SourceIncludeMetadata::Offset { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "offset".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Offset))
1248            }
1249            SourceIncludeMetadata::Headers { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "headers".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Headers))
1255            }
1256            SourceIncludeMetadata::Header {
1257                alias,
1258                key,
1259                use_bytes,
1260            } => Some((
1261                alias.to_string(),
1262                KafkaMetadataKind::Header {
1263                    key: key.clone(),
1264                    use_bytes: *use_bytes,
1265                },
1266            )),
1267            SourceIncludeMetadata::Key { .. } => {
1268                // handled below
1269                None
1270            }
1271        })
1272        .collect();
1273    Ok(KafkaSourceConnection {
1274        connection: connection_item.id(),
1275        connection_id: connection_item.id(),
1276        topic,
1277        start_offsets,
1278        group_id_prefix,
1279        topic_metadata_refresh_interval,
1280        metadata_columns,
1281    })
1282}
1283
1284fn apply_source_envelope_encoding(
1285    scx: &StatementContext,
1286    envelope: &ast::SourceEnvelope,
1287    format: &Option<FormatSpecifier<Aug>>,
1288    key_desc: Option<RelationDesc>,
1289    value_desc: RelationDesc,
1290    include_metadata: &[SourceIncludeMetadata],
1291    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292    source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294    (
1295        RelationDesc,
1296        SourceEnvelope,
1297        Option<SourceDataEncoding<ReferencedConnection>>,
1298    ),
1299    PlanError,
1300> {
1301    let encoding = match format {
1302        Some(format) => Some(get_encoding(scx, format, envelope)?),
1303        None => None,
1304    };
1305
1306    let (key_desc, value_desc) = match &encoding {
1307        Some(encoding) => {
1308            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1309            // single column of type bytes.
1310            match value_desc.typ().columns() {
1311                [typ] => match typ.scalar_type {
1312                    SqlScalarType::Bytes => {}
1313                    _ => sql_bail!(
1314                        "The schema produced by the source is incompatible with format decoding"
1315                    ),
1316                },
1317                _ => sql_bail!(
1318                    "The schema produced by the source is incompatible with format decoding"
1319                ),
1320            }
1321
1322            let (key_desc, value_desc) = encoding.desc()?;
1323
1324            // TODO(petrosagg): This piece of code seems to be making a statement about the
1325            // nullability of the NONE envelope when the source is Kafka. As written, the code
1326            // misses opportunities to mark columns as not nullable and is over conservative. For
1327            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1328            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1329            // should be replaced with precise type-level reasoning.
1330            let key_desc = key_desc.map(|desc| {
1331                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333                if is_kafka && is_envelope_none {
1334                    RelationDesc::from_names_and_types(
1335                        desc.into_iter()
1336                            .map(|(name, typ)| (name, typ.nullable(true))),
1337                    )
1338                } else {
1339                    desc
1340                }
1341            });
1342            (key_desc, value_desc)
1343        }
1344        None => (key_desc, value_desc),
1345    };
1346
1347    // KEY VALUE load generators are the only UPSERT source that
1348    // has no encoding but defaults to `INCLUDE KEY`.
1349    //
1350    // As discussed
1351    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1352    // removing this special case amounts to deciding how to handle null keys
1353    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1354    // this special case in.
1355    //
1356    // Note that this is safe because this generator is
1357    // 1. The only source with no encoding that can have its key included.
1358    // 2. Never produces null keys (or values, for that matter).
1359    let key_envelope_no_encoding = matches!(
1360        source_connection,
1361        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362            load_generator: LoadGenerator::KeyValue(_),
1363            ..
1364        })
1365    );
1366    let mut key_envelope = get_key_envelope(
1367        include_metadata,
1368        encoding.as_ref(),
1369        key_envelope_no_encoding,
1370    )?;
1371
1372    match (&envelope, &key_envelope) {
1373        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376        ),
1377        _ => {}
1378    };
1379
1380    // Not all source envelopes are compatible with all source connections.
1381    // Whoever constructs the source ingestion pipeline is responsible for
1382    // choosing compatible envelopes and connections.
1383    //
1384    // TODO(guswynn): ambiguously assert which connections and envelopes are
1385    // compatible in typechecking
1386    //
1387    // TODO: remove bails as more support for upsert is added.
1388    let envelope = match &envelope {
1389        // TODO: fixup key envelope
1390        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391        ast::SourceEnvelope::Debezium => {
1392            //TODO check that key envelope is not set
1393            let after_idx = match typecheck_debezium(&value_desc) {
1394                Ok((_before_idx, after_idx)) => Ok(after_idx),
1395                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396                    Some(DataEncoding::Avro(_)) => Err(type_err),
1397                    _ => Err(sql_err!(
1398                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399                    )),
1400                },
1401            }?;
1402
1403            UnplannedSourceEnvelope::Upsert {
1404                style: UpsertStyle::Debezium { after_idx },
1405            }
1406        }
1407        ast::SourceEnvelope::Upsert {
1408            value_decode_err_policy,
1409        } => {
1410            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411                None => {
1412                    if !key_envelope_no_encoding {
1413                        bail_unsupported!(format!(
1414                            "UPSERT requires a key/value format: {:?}",
1415                            format
1416                        ))
1417                    }
1418                    None
1419                }
1420                Some(key_encoding) => Some(key_encoding),
1421            };
1422            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1423            // specified.
1424            if key_envelope == KeyEnvelope::None {
1425                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426            }
1427            // If the value decode error policy is not set we use the default upsert style.
1428            let style = match value_decode_err_policy.as_slice() {
1429                [] => UpsertStyle::Default(key_envelope),
1430                [SourceErrorPolicy::Inline { alias }] => {
1431                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432                    UpsertStyle::ValueErrInline {
1433                        key_envelope,
1434                        error_column: alias
1435                            .as_ref()
1436                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437                    }
1438                }
1439                _ => {
1440                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441                }
1442            };
1443
1444            UnplannedSourceEnvelope::Upsert { style }
1445        }
1446        ast::SourceEnvelope::CdcV2 => {
1447            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448            //TODO check that key envelope is not set
1449            match format {
1450                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452            }
1453            UnplannedSourceEnvelope::CdcV2
1454        }
1455    };
1456
1457    let metadata_desc = included_column_desc(metadata_columns_desc);
1458    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460    Ok((desc, envelope, encoding))
1461}
1462
1463/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1464/// of columns and constraints.
1465fn plan_source_export_desc(
1466    scx: &StatementContext,
1467    name: &UnresolvedItemName,
1468    columns: &Vec<ColumnDef<Aug>>,
1469    constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471    let names: Vec<_> = columns
1472        .iter()
1473        .map(|c| normalize::column_name(c.name.clone()))
1474        .collect();
1475
1476    if let Some(dup) = names.iter().duplicates().next() {
1477        sql_bail!("column {} specified more than once", dup.quoted());
1478    }
1479
1480    // Build initial relation type that handles declared data types
1481    // and NOT NULL constraints.
1482    let mut column_types = Vec::with_capacity(columns.len());
1483    let mut keys = Vec::new();
1484
1485    for (i, c) in columns.into_iter().enumerate() {
1486        let aug_data_type = &c.data_type;
1487        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488        let mut nullable = true;
1489        for option in &c.options {
1490            match &option.option {
1491                ColumnOption::NotNull => nullable = false,
1492                ColumnOption::Default(_) => {
1493                    bail_unsupported!("Source export with default value")
1494                }
1495                ColumnOption::Unique { is_primary } => {
1496                    keys.push(vec![i]);
1497                    if *is_primary {
1498                        nullable = false;
1499                    }
1500                }
1501                other => {
1502                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1503                }
1504            }
1505        }
1506        column_types.push(ty.nullable(nullable));
1507    }
1508
1509    let mut seen_primary = false;
1510    'c: for constraint in constraints {
1511        match constraint {
1512            TableConstraint::Unique {
1513                name: _,
1514                columns,
1515                is_primary,
1516                nulls_not_distinct,
1517            } => {
1518                if seen_primary && *is_primary {
1519                    sql_bail!(
1520                        "multiple primary keys for source export {} are not allowed",
1521                        name.to_ast_string_stable()
1522                    );
1523                }
1524                seen_primary = *is_primary || seen_primary;
1525
1526                let mut key = vec![];
1527                for column in columns {
1528                    let column = normalize::column_name(column.clone());
1529                    match names.iter().position(|name| *name == column) {
1530                        None => sql_bail!("unknown column in constraint: {}", column),
1531                        Some(i) => {
1532                            let nullable = &mut column_types[i].nullable;
1533                            if *is_primary {
1534                                if *nulls_not_distinct {
1535                                    sql_bail!(
1536                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537                                    );
1538                                }
1539                                *nullable = false;
1540                            } else if !(*nulls_not_distinct || !*nullable) {
1541                                // Non-primary key unique constraints are only keys if all of their
1542                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1543                                break 'c;
1544                            }
1545
1546                            key.push(i);
1547                        }
1548                    }
1549                }
1550
1551                if *is_primary {
1552                    keys.insert(0, key);
1553                } else {
1554                    keys.push(key);
1555                }
1556            }
1557            TableConstraint::ForeignKey { .. } => {
1558                bail_unsupported!("Source export with a foreign key")
1559            }
1560            TableConstraint::Check { .. } => {
1561                bail_unsupported!("Source export with a check constraint")
1562            }
1563        }
1564    }
1565
1566    let typ = SqlRelationType::new(column_types).with_keys(keys);
1567    let desc = RelationDesc::new(typ, names);
1568    Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572    CreateSubsourceOption,
1573    (Progress, bool, Default(false)),
1574    (ExternalReference, UnresolvedItemName),
1575    (RetainHistory, OptionalDuration),
1576    (TextColumns, Vec::<Ident>, Default(vec![])),
1577    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578    (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582    scx: &StatementContext,
1583    stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585    let CreateSubsourceStatement {
1586        name,
1587        columns,
1588        of_source,
1589        constraints,
1590        if_not_exists,
1591        with_options,
1592    } = &stmt;
1593
1594    let CreateSubsourceOptionExtracted {
1595        progress,
1596        retain_history,
1597        external_reference,
1598        text_columns,
1599        exclude_columns,
1600        details,
1601        seen: _,
1602    } = with_options.clone().try_into()?;
1603
1604    // This invariant is enforced during purification; we are responsible for
1605    // creating the AST for subsources as a response to CREATE SOURCE
1606    // statements, so this would fire in integration testing if we failed to
1607    // uphold it.
1608    assert!(
1609        progress ^ (external_reference.is_some() && of_source.is_some()),
1610        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611    );
1612
1613    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615    let data_source = if let Some(source_reference) = of_source {
1616        // If the new source table syntax is forced we should not be creating any non-progress
1617        // subsources.
1618        if scx.catalog.system_vars().enable_create_table_from_source()
1619            && scx.catalog.system_vars().force_source_table_syntax()
1620        {
1621            Err(PlanError::UseTablesForSources(
1622                "CREATE SUBSOURCE".to_string(),
1623            ))?;
1624        }
1625
1626        // This is a subsource with the "natural" dependency order, i.e. it is
1627        // not a legacy subsource with the inverted structure.
1628        let ingestion_id = *source_reference.item_id();
1629        let external_reference = external_reference.unwrap();
1630
1631        // Decode the details option stored on the subsource statement, which contains information
1632        // created during the purification process.
1633        let details = details
1634            .as_ref()
1635            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637        let details =
1638            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639        let details =
1640            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641        let details = match details {
1642            SourceExportStatementDetails::Postgres { table } => {
1643                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644                    column_casts: crate::pure::postgres::generate_column_casts(
1645                        scx,
1646                        &table,
1647                        &text_columns,
1648                    )?,
1649                    table,
1650                })
1651            }
1652            SourceExportStatementDetails::MySql {
1653                table,
1654                initial_gtid_set,
1655            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656                table,
1657                initial_gtid_set,
1658                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659                exclude_columns: exclude_columns
1660                    .into_iter()
1661                    .map(|c| c.into_string())
1662                    .collect(),
1663            }),
1664            SourceExportStatementDetails::SqlServer {
1665                table,
1666                capture_instance,
1667                initial_lsn,
1668            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669                capture_instance,
1670                table,
1671                initial_lsn,
1672                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673                exclude_columns: exclude_columns
1674                    .into_iter()
1675                    .map(|c| c.into_string())
1676                    .collect(),
1677            }),
1678            SourceExportStatementDetails::LoadGenerator { output } => {
1679                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680            }
1681            SourceExportStatementDetails::Kafka {} => {
1682                bail_unsupported!("subsources cannot reference Kafka sources")
1683            }
1684        };
1685        DataSourceDesc::IngestionExport {
1686            ingestion_id,
1687            external_reference,
1688            details,
1689            // Subsources don't currently support non-default envelopes / encoding
1690            data_config: SourceExportDataConfig {
1691                envelope: SourceEnvelope::None(NoneEnvelope {
1692                    key_envelope: KeyEnvelope::None,
1693                    key_arity: 0,
1694                }),
1695                encoding: None,
1696            },
1697        }
1698    } else if progress {
1699        DataSourceDesc::Progress
1700    } else {
1701        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702    };
1703
1704    let if_not_exists = *if_not_exists;
1705    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710    let source = Source {
1711        create_sql,
1712        data_source,
1713        desc,
1714        compaction_window,
1715    };
1716
1717    Ok(Plan::CreateSource(CreateSourcePlan {
1718        name,
1719        source,
1720        if_not_exists,
1721        timeline: Timeline::EpochMilliseconds,
1722        in_cluster: None,
1723    }))
1724}
1725
1726generate_extracted_config!(
1727    TableFromSourceOption,
1728    (TextColumns, Vec::<Ident>, Default(vec![])),
1729    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730    (PartitionBy, Vec<Ident>),
1731    (RetainHistory, OptionalDuration),
1732    (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736    scx: &StatementContext,
1737    stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739    if !scx.catalog.system_vars().enable_create_table_from_source() {
1740        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741    }
1742
1743    let CreateTableFromSourceStatement {
1744        name,
1745        columns,
1746        constraints,
1747        if_not_exists,
1748        source,
1749        external_reference,
1750        envelope,
1751        format,
1752        include_metadata,
1753        with_options,
1754    } = &stmt;
1755
1756    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758    let TableFromSourceOptionExtracted {
1759        text_columns,
1760        exclude_columns,
1761        retain_history,
1762        partition_by,
1763        details,
1764        seen: _,
1765    } = with_options.clone().try_into()?;
1766
1767    let source_item = scx.get_item_by_resolved_name(source)?;
1768    let ingestion_id = source_item.id();
1769
1770    // Decode the details option stored on the statement, which contains information
1771    // created during the purification process.
1772    let details = details
1773        .as_ref()
1774        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776    let details =
1777        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778    let details =
1779        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782        && include_metadata
1783            .iter()
1784            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785    {
1786        // TODO(guswynn): should this be `bail_unsupported!`?
1787        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788    }
1789    if !matches!(
1790        details,
1791        SourceExportStatementDetails::Kafka { .. }
1792            | SourceExportStatementDetails::LoadGenerator { .. }
1793    ) && !include_metadata.is_empty()
1794    {
1795        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796    }
1797
1798    let details = match details {
1799        SourceExportStatementDetails::Postgres { table } => {
1800            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801                column_casts: crate::pure::postgres::generate_column_casts(
1802                    scx,
1803                    &table,
1804                    &text_columns,
1805                )?,
1806                table,
1807            })
1808        }
1809        SourceExportStatementDetails::MySql {
1810            table,
1811            initial_gtid_set,
1812        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813            table,
1814            initial_gtid_set,
1815            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816            exclude_columns: exclude_columns
1817                .into_iter()
1818                .map(|c| c.into_string())
1819                .collect(),
1820        }),
1821        SourceExportStatementDetails::SqlServer {
1822            table,
1823            capture_instance,
1824            initial_lsn,
1825        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826            table,
1827            capture_instance,
1828            initial_lsn,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834        }),
1835        SourceExportStatementDetails::LoadGenerator { output } => {
1836            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837        }
1838        SourceExportStatementDetails::Kafka {} => {
1839            if !include_metadata.is_empty()
1840                && !matches!(
1841                    envelope,
1842                    ast::SourceEnvelope::Upsert { .. }
1843                        | ast::SourceEnvelope::None
1844                        | ast::SourceEnvelope::Debezium
1845                )
1846            {
1847                // TODO(guswynn): should this be `bail_unsupported!`?
1848                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849            }
1850
1851            let metadata_columns = include_metadata
1852                .into_iter()
1853                .flat_map(|item| match item {
1854                    SourceIncludeMetadata::Timestamp { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "timestamp".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Timestamp))
1860                    }
1861                    SourceIncludeMetadata::Partition { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "partition".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Partition))
1867                    }
1868                    SourceIncludeMetadata::Offset { alias } => {
1869                        let name = match alias {
1870                            Some(name) => name.to_string(),
1871                            None => "offset".to_owned(),
1872                        };
1873                        Some((name, KafkaMetadataKind::Offset))
1874                    }
1875                    SourceIncludeMetadata::Headers { alias } => {
1876                        let name = match alias {
1877                            Some(name) => name.to_string(),
1878                            None => "headers".to_owned(),
1879                        };
1880                        Some((name, KafkaMetadataKind::Headers))
1881                    }
1882                    SourceIncludeMetadata::Header {
1883                        alias,
1884                        key,
1885                        use_bytes,
1886                    } => Some((
1887                        alias.to_string(),
1888                        KafkaMetadataKind::Header {
1889                            key: key.clone(),
1890                            use_bytes: *use_bytes,
1891                        },
1892                    )),
1893                    SourceIncludeMetadata::Key { .. } => {
1894                        // handled below
1895                        None
1896                    }
1897                })
1898                .collect();
1899
1900            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901        }
1902    };
1903
1904    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1907    // during purification and define the `columns` and `constraints` fields for the statement,
1908    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1909    // we use the source connection's default schema.
1910    let (key_desc, value_desc) =
1911        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912            let columns = match columns {
1913                TableFromSourceColumns::Defined(columns) => columns,
1914                _ => unreachable!(),
1915            };
1916            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917            (None, desc)
1918        } else {
1919            let key_desc = source_connection.default_key_desc();
1920            let value_desc = source_connection.default_value_desc();
1921            (Some(key_desc), value_desc)
1922        };
1923
1924    let metadata_columns_desc = match &details {
1925        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926            metadata_columns, ..
1927        }) => kafka_metadata_columns_desc(metadata_columns),
1928        _ => vec![],
1929    };
1930
1931    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932        scx,
1933        &envelope,
1934        format,
1935        key_desc,
1936        value_desc,
1937        include_metadata,
1938        metadata_columns_desc,
1939        source_connection,
1940    )?;
1941    if let TableFromSourceColumns::Named(col_names) = columns {
1942        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943    }
1944
1945    let names: Vec<_> = desc.iter_names().cloned().collect();
1946    if let Some(dup) = names.iter().duplicates().next() {
1947        sql_bail!("column {} specified more than once", dup.quoted());
1948    }
1949
1950    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952    // Allow users to specify a timeline. If they do not, determine a default
1953    // timeline for the source.
1954    let timeline = match envelope {
1955        SourceEnvelope::CdcV2 => {
1956            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957        }
1958        _ => Timeline::EpochMilliseconds,
1959    };
1960
1961    if let Some(partition_by) = partition_by {
1962        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963        check_partition_by(&desc, partition_by)?;
1964    }
1965
1966    let data_source = DataSourceDesc::IngestionExport {
1967        ingestion_id,
1968        external_reference: external_reference
1969            .as_ref()
1970            .expect("populated in purification")
1971            .clone(),
1972        details,
1973        data_config: SourceExportDataConfig { envelope, encoding },
1974    };
1975
1976    let if_not_exists = *if_not_exists;
1977
1978    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981    let table = Table {
1982        create_sql,
1983        desc: VersionedRelationDesc::new(desc),
1984        temporary: false,
1985        compaction_window,
1986        data_source: TableDataSource::DataSource {
1987            desc: data_source,
1988            timeline,
1989        },
1990    };
1991
1992    Ok(Plan::CreateTable(CreateTablePlan {
1993        name,
1994        table,
1995        if_not_exists,
1996    }))
1997}
1998
1999generate_extracted_config!(
2000    LoadGeneratorOption,
2001    (TickInterval, Duration),
2002    (AsOf, u64, Default(0_u64)),
2003    (UpTo, u64, Default(u64::MAX)),
2004    (ScaleFactor, f64),
2005    (MaxCardinality, u64),
2006    (Keys, u64),
2007    (SnapshotRounds, u64),
2008    (TransactionalSnapshot, bool),
2009    (ValueSize, u64),
2010    (Seed, u64),
2011    (Partitions, u64),
2012    (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016    pub(super) fn ensure_only_valid_options(
2017        &self,
2018        loadgen: &ast::LoadGenerator,
2019    ) -> Result<(), PlanError> {
2020        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022        let mut options = self.seen.clone();
2023
2024        let permitted_options: &[_] = match loadgen {
2025            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031            ast::LoadGenerator::KeyValue => &[
2032                TickInterval,
2033                Keys,
2034                SnapshotRounds,
2035                TransactionalSnapshot,
2036                ValueSize,
2037                Seed,
2038                Partitions,
2039                BatchSize,
2040            ],
2041        };
2042
2043        for o in permitted_options {
2044            options.remove(o);
2045        }
2046
2047        if !options.is_empty() {
2048            sql_bail!(
2049                "{} load generators do not support {} values",
2050                loadgen,
2051                options.iter().join(", ")
2052            )
2053        }
2054
2055        Ok(())
2056    }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060    scx: &StatementContext,
2061    loadgen: &ast::LoadGenerator,
2062    options: &[LoadGeneratorOption<Aug>],
2063    include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066    extracted.ensure_only_valid_options(loadgen)?;
2067
2068    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070    }
2071
2072    let load_generator = match loadgen {
2073        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074        ast::LoadGenerator::Clock => {
2075            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076            LoadGenerator::Clock
2077        }
2078        ast::LoadGenerator::Counter => {
2079            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080            let LoadGeneratorOptionExtracted {
2081                max_cardinality, ..
2082            } = extracted;
2083            LoadGenerator::Counter { max_cardinality }
2084        }
2085        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086        ast::LoadGenerator::Datums => {
2087            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088            LoadGenerator::Datums
2089        }
2090        ast::LoadGenerator::Tpch => {
2091            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093            // Default to 0.01 scale factor (=10MB).
2094            let sf: f64 = scale_factor.unwrap_or(0.01);
2095            if !sf.is_finite() || sf < 0.0 {
2096                sql_bail!("unsupported scale factor {sf}");
2097            }
2098
2099            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100                let total = (sf * multiplier).floor();
2101                let mut i = i64::try_cast_from(total)
2102                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103                if i < 1 {
2104                    i = 1;
2105                }
2106                Ok(i)
2107            };
2108
2109            // The multiplications here are safely unchecked because they will
2110            // overflow to infinity, which will be caught by f64_to_i64.
2111            let count_supplier = f_to_i(10_000f64)?;
2112            let count_part = f_to_i(200_000f64)?;
2113            let count_customer = f_to_i(150_000f64)?;
2114            let count_orders = f_to_i(150_000f64 * 10f64)?;
2115            let count_clerk = f_to_i(1_000f64)?;
2116
2117            LoadGenerator::Tpch {
2118                count_supplier,
2119                count_part,
2120                count_customer,
2121                count_orders,
2122                count_clerk,
2123            }
2124        }
2125        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127            let LoadGeneratorOptionExtracted {
2128                keys,
2129                snapshot_rounds,
2130                transactional_snapshot,
2131                value_size,
2132                tick_interval,
2133                seed,
2134                partitions,
2135                batch_size,
2136                ..
2137            } = extracted;
2138
2139            let mut include_offset = None;
2140            for im in include_metadata {
2141                match im {
2142                    SourceIncludeMetadata::Offset { alias } => {
2143                        include_offset = match alias {
2144                            Some(alias) => Some(alias.to_string()),
2145                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146                        }
2147                    }
2148                    SourceIncludeMetadata::Key { .. } => continue,
2149
2150                    _ => {
2151                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152                    }
2153                };
2154            }
2155
2156            let lgkv = KeyValueLoadGenerator {
2157                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158                snapshot_rounds: snapshot_rounds
2159                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160                // Defaults to true.
2161                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162                value_size: value_size
2163                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164                partitions: partitions
2165                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166                tick_interval,
2167                batch_size: batch_size
2168                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170                include_offset,
2171            };
2172
2173            if lgkv.keys == 0
2174                || lgkv.partitions == 0
2175                || lgkv.value_size == 0
2176                || lgkv.batch_size == 0
2177            {
2178                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179            }
2180
2181            if lgkv.keys % lgkv.partitions != 0 {
2182                sql_bail!("KEYS must be a multiple of PARTITIONS")
2183            }
2184
2185            if lgkv.batch_size > lgkv.keys {
2186                sql_bail!("KEYS must be larger than BATCH SIZE")
2187            }
2188
2189            // This constraints simplifies the source implementation.
2190            // We can lift it later.
2191            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193            }
2194
2195            if lgkv.snapshot_rounds == 0 {
2196                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197            }
2198
2199            LoadGenerator::KeyValue(lgkv)
2200        }
2201    };
2202
2203    Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207    let before = value_desc.get_by_name(&"before".into());
2208    let (after_idx, after_ty) = value_desc
2209        .get_by_name(&"after".into())
2210        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211    let before_idx = if let Some((before_idx, before_ty)) = before {
2212        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213            sql_bail!("'before' column must be of type record");
2214        }
2215        if before_ty != after_ty {
2216            sql_bail!("'before' type differs from 'after' column");
2217        }
2218        Some(before_idx)
2219    } else {
2220        None
2221    };
2222    Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226    scx: &StatementContext,
2227    format: &FormatSpecifier<Aug>,
2228    envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230    let encoding = match format {
2231        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232        FormatSpecifier::KeyValue { key, value } => {
2233            let key = {
2234                let encoding = get_encoding_inner(scx, key)?;
2235                Some(encoding.key.unwrap_or(encoding.value))
2236            };
2237            let value = get_encoding_inner(scx, value)?.value;
2238            SourceDataEncoding { key, value }
2239        }
2240    };
2241
2242    let requires_keyvalue = matches!(
2243        envelope,
2244        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245    );
2246    let is_keyvalue = encoding.key.is_some();
2247    if requires_keyvalue && !is_keyvalue {
2248        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249    };
2250
2251    Ok(encoding)
2252}
2253
2254/// Determine the cluster ID to use for this item.
2255///
2256/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2257/// Because of this, do not normalize/canonicalize the create SQL statement
2258/// until after calling this function.
2259fn source_sink_cluster_config<'a, 'ctx>(
2260    scx: &'a StatementContext<'ctx>,
2261    in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263    let cluster = match in_cluster {
2264        None => {
2265            let cluster = scx.catalog.resolve_cluster(None)?;
2266            *in_cluster = Some(ResolvedClusterName {
2267                id: cluster.id(),
2268                print_name: None,
2269            });
2270            cluster
2271        }
2272        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273    };
2274
2275    Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282    pub key_schema: Option<String>,
2283    pub value_schema: String,
2284    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285    pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289    scx: &StatementContext,
2290    format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292    let value = match format {
2293        Format::Bytes => DataEncoding::Bytes,
2294        Format::Avro(schema) => {
2295            let Schema {
2296                key_schema,
2297                value_schema,
2298                csr_connection,
2299                confluent_wire_format,
2300            } = match schema {
2301                // TODO(jldlaughlin): we need a way to pass in primary key information
2302                // when building a source from a string or file.
2303                AvroSchema::InlineSchema {
2304                    schema: ast::Schema { schema },
2305                    with_options,
2306                } => {
2307                    let AvroSchemaOptionExtracted {
2308                        confluent_wire_format,
2309                        ..
2310                    } = with_options.clone().try_into()?;
2311
2312                    Schema {
2313                        key_schema: None,
2314                        value_schema: schema.clone(),
2315                        csr_connection: None,
2316                        confluent_wire_format,
2317                    }
2318                }
2319                AvroSchema::Csr {
2320                    csr_connection:
2321                        CsrConnectionAvro {
2322                            connection,
2323                            seed,
2324                            key_strategy: _,
2325                            value_strategy: _,
2326                        },
2327                } => {
2328                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329                    let csr_connection = match item.connection()? {
2330                        Connection::Csr(_) => item.id(),
2331                        _ => {
2332                            sql_bail!(
2333                                "{} is not a schema registry connection",
2334                                scx.catalog
2335                                    .resolve_full_name(item.name())
2336                                    .to_string()
2337                                    .quoted()
2338                            )
2339                        }
2340                    };
2341
2342                    if let Some(seed) = seed {
2343                        Schema {
2344                            key_schema: seed.key_schema.clone(),
2345                            value_schema: seed.value_schema.clone(),
2346                            csr_connection: Some(csr_connection),
2347                            confluent_wire_format: true,
2348                        }
2349                    } else {
2350                        unreachable!("CSR seed resolution should already have been called: Avro")
2351                    }
2352                }
2353            };
2354
2355            if let Some(key_schema) = key_schema {
2356                return Ok(SourceDataEncoding {
2357                    key: Some(DataEncoding::Avro(AvroEncoding {
2358                        schema: key_schema,
2359                        csr_connection: csr_connection.clone(),
2360                        confluent_wire_format,
2361                    })),
2362                    value: DataEncoding::Avro(AvroEncoding {
2363                        schema: value_schema,
2364                        csr_connection,
2365                        confluent_wire_format,
2366                    }),
2367                });
2368            } else {
2369                DataEncoding::Avro(AvroEncoding {
2370                    schema: value_schema,
2371                    csr_connection,
2372                    confluent_wire_format,
2373                })
2374            }
2375        }
2376        Format::Protobuf(schema) => match schema {
2377            ProtobufSchema::Csr {
2378                csr_connection:
2379                    CsrConnectionProtobuf {
2380                        connection:
2381                            CsrConnection {
2382                                connection,
2383                                options,
2384                            },
2385                        seed,
2386                    },
2387            } => {
2388                if let Some(CsrSeedProtobuf { key, value }) = seed {
2389                    let item = scx.get_item_by_resolved_name(connection)?;
2390                    let _ = match item.connection()? {
2391                        Connection::Csr(connection) => connection,
2392                        _ => {
2393                            sql_bail!(
2394                                "{} is not a schema registry connection",
2395                                scx.catalog
2396                                    .resolve_full_name(item.name())
2397                                    .to_string()
2398                                    .quoted()
2399                            )
2400                        }
2401                    };
2402
2403                    if !options.is_empty() {
2404                        sql_bail!("Protobuf CSR connections do not support any options");
2405                    }
2406
2407                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2408                        descriptors: strconv::parse_bytes(&value.schema)?,
2409                        message_name: value.message_name.clone(),
2410                        confluent_wire_format: true,
2411                    });
2412                    if let Some(key) = key {
2413                        return Ok(SourceDataEncoding {
2414                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415                                descriptors: strconv::parse_bytes(&key.schema)?,
2416                                message_name: key.message_name.clone(),
2417                                confluent_wire_format: true,
2418                            })),
2419                            value,
2420                        });
2421                    }
2422                    value
2423                } else {
2424                    unreachable!("CSR seed resolution should already have been called: Proto")
2425                }
2426            }
2427            ProtobufSchema::InlineSchema {
2428                message_name,
2429                schema: ast::Schema { schema },
2430            } => {
2431                let descriptors = strconv::parse_bytes(schema)?;
2432
2433                DataEncoding::Protobuf(ProtobufEncoding {
2434                    descriptors,
2435                    message_name: message_name.to_owned(),
2436                    confluent_wire_format: false,
2437                })
2438            }
2439        },
2440        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441            regex: mz_repr::adt::regex::Regex::new(regex, false)
2442                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443        }),
2444        Format::Csv { columns, delimiter } => {
2445            let columns = match columns {
2446                CsvColumns::Header { names } => {
2447                    if names.is_empty() {
2448                        sql_bail!("[internal error] column spec should get names in purify")
2449                    }
2450                    ColumnSpec::Header {
2451                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452                    }
2453                }
2454                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455            };
2456            DataEncoding::Csv(CsvEncoding {
2457                columns,
2458                delimiter: u8::try_from(*delimiter)
2459                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460            })
2461        }
2462        Format::Json { array: false } => DataEncoding::Json,
2463        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464        Format::Text => DataEncoding::Text,
2465    };
2466    Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469/// Extract the key envelope, if it is requested
2470fn get_key_envelope(
2471    included_items: &[SourceIncludeMetadata],
2472    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473    key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475    let key_definition = included_items
2476        .iter()
2477        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482            (Some(name), _) if key_envelope_no_encoding => {
2483                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484            }
2485            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486            (_, None) => {
2487                // `kd.alias` == `None` means `INCLUDE KEY`
2488                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2489                // These both make sense with the same error message
2490                sql_bail!(
2491                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492                        got bare FORMAT"
2493                );
2494            }
2495        }
2496    } else {
2497        Ok(KeyEnvelope::None)
2498    }
2499}
2500
2501/// Gets the key envelope for a given key encoding when no name for the key has
2502/// been requested by the user.
2503fn get_unnamed_key_envelope(
2504    key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506    // If the key is requested but comes from an unnamed type then it gets the name "key"
2507    //
2508    // Otherwise it gets the names of the columns in the type
2509    let is_composite = match key {
2510        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511        Some(
2512            DataEncoding::Avro(_)
2513            | DataEncoding::Csv(_)
2514            | DataEncoding::Protobuf(_)
2515            | DataEncoding::Regex { .. },
2516        ) => true,
2517        None => false,
2518    };
2519
2520    if is_composite {
2521        Ok(KeyEnvelope::Flattened)
2522    } else {
2523        Ok(KeyEnvelope::Named("key".to_string()))
2524    }
2525}
2526
2527pub fn describe_create_view(
2528    _: &StatementContext,
2529    _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531    Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535    scx: &StatementContext,
2536    def: &mut ViewDefinition<Aug>,
2537    temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539    let create_sql = normalize::create_statement(
2540        scx,
2541        Statement::CreateView(CreateViewStatement {
2542            if_exists: IfExistsBehavior::Error,
2543            temporary,
2544            definition: def.clone(),
2545        }),
2546    )?;
2547
2548    let ViewDefinition {
2549        name,
2550        columns,
2551        query,
2552    } = def;
2553
2554    let query::PlannedRootQuery {
2555        expr,
2556        mut desc,
2557        finishing,
2558        scope: _,
2559    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2561    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2562    // here to help with database-issues#236. However, in the meantime, there might be a better
2563    // approach to solve database-issues#236:
2564    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2565    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566        &finishing,
2567        expr.arity()
2568    ));
2569    if expr.contains_parameters()? {
2570        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571    }
2572
2573    let dependencies = expr
2574        .depends_on()
2575        .into_iter()
2576        .map(|gid| scx.catalog.resolve_item_id(&gid))
2577        .collect();
2578
2579    let name = if temporary {
2580        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581    } else {
2582        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583    };
2584
2585    plan_utils::maybe_rename_columns(
2586        format!("view {}", scx.catalog.resolve_full_name(&name)),
2587        &mut desc,
2588        columns,
2589    )?;
2590    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592    if let Some(dup) = names.iter().duplicates().next() {
2593        sql_bail!("column {} specified more than once", dup.quoted());
2594    }
2595
2596    let view = View {
2597        create_sql,
2598        expr,
2599        dependencies,
2600        column_names: names,
2601        temporary,
2602    };
2603
2604    Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608    scx: &StatementContext,
2609    mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611    let CreateViewStatement {
2612        temporary,
2613        if_exists,
2614        definition,
2615    } = &mut stmt;
2616    let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618    // Override the statement-level IfExistsBehavior with Skip if this is
2619    // explicitly requested in the PlanContext (the default is `false`).
2620    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623        let if_exists = true;
2624        let cascade = false;
2625        let maybe_item_to_drop = plan_drop_item(
2626            scx,
2627            ObjectType::View,
2628            if_exists,
2629            definition.name.clone(),
2630            cascade,
2631        )?;
2632
2633        // Check if the new View depends on the item that we would be replacing.
2634        if let Some(id) = maybe_item_to_drop {
2635            let dependencies = view.expr.depends_on();
2636            let invalid_drop = scx
2637                .get_item(&id)
2638                .global_ids()
2639                .any(|gid| dependencies.contains(&gid));
2640            if invalid_drop {
2641                let item = scx.catalog.get_item(&id);
2642                sql_bail!(
2643                    "cannot replace view {0}: depended upon by new {0} definition",
2644                    scx.catalog.resolve_full_name(item.name())
2645                );
2646            }
2647
2648            Some(id)
2649        } else {
2650            None
2651        }
2652    } else {
2653        None
2654    };
2655    let drop_ids = replace
2656        .map(|id| {
2657            scx.catalog
2658                .item_dependents(id)
2659                .into_iter()
2660                .map(|id| id.unwrap_item_id())
2661                .collect()
2662        })
2663        .unwrap_or_default();
2664
2665    // Check for an object in the catalog with this same name
2666    let full_name = scx.catalog.resolve_full_name(&name);
2667    let partial_name = PartialItemName::from(full_name.clone());
2668    // For PostgreSQL compatibility, we need to prevent creating views when
2669    // there is an existing object *or* type of the same name.
2670    if let (Ok(item), IfExistsBehavior::Error, false) = (
2671        scx.catalog.resolve_item_or_type(&partial_name),
2672        *if_exists,
2673        ignore_if_exists_errors,
2674    ) {
2675        return Err(PlanError::ItemAlreadyExists {
2676            name: full_name.to_string(),
2677            item_type: item.item_type(),
2678        });
2679    }
2680
2681    Ok(Plan::CreateView(CreateViewPlan {
2682        name,
2683        view,
2684        replace,
2685        drop_ids,
2686        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2687        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2688    }))
2689}
2690
2691pub fn describe_create_materialized_view(
2692    _: &StatementContext,
2693    _: CreateMaterializedViewStatement<Aug>,
2694) -> Result<StatementDesc, PlanError> {
2695    Ok(StatementDesc::new(None))
2696}
2697
2698pub fn describe_create_continual_task(
2699    _: &StatementContext,
2700    _: CreateContinualTaskStatement<Aug>,
2701) -> Result<StatementDesc, PlanError> {
2702    Ok(StatementDesc::new(None))
2703}
2704
2705pub fn describe_create_network_policy(
2706    _: &StatementContext,
2707    _: CreateNetworkPolicyStatement<Aug>,
2708) -> Result<StatementDesc, PlanError> {
2709    Ok(StatementDesc::new(None))
2710}
2711
2712pub fn describe_alter_network_policy(
2713    _: &StatementContext,
2714    _: AlterNetworkPolicyStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716    Ok(StatementDesc::new(None))
2717}
2718
2719pub fn plan_create_materialized_view(
2720    scx: &StatementContext,
2721    mut stmt: CreateMaterializedViewStatement<Aug>,
2722) -> Result<Plan, PlanError> {
2723    let cluster_id =
2724        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2725    stmt.in_cluster = Some(ResolvedClusterName {
2726        id: cluster_id,
2727        print_name: None,
2728    });
2729
2730    let create_sql =
2731        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2732
2733    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2734    let name = scx.allocate_qualified_name(partial_name.clone())?;
2735
2736    let query::PlannedRootQuery {
2737        expr,
2738        mut desc,
2739        finishing,
2740        scope: _,
2741    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2742    // We get back a trivial finishing, see comment in `plan_view`.
2743    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2744        &finishing,
2745        expr.arity()
2746    ));
2747    if expr.contains_parameters()? {
2748        return Err(PlanError::ParameterNotAllowed(
2749            "materialized views".to_string(),
2750        ));
2751    }
2752
2753    plan_utils::maybe_rename_columns(
2754        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2755        &mut desc,
2756        &stmt.columns,
2757    )?;
2758    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2759
2760    let MaterializedViewOptionExtracted {
2761        assert_not_null,
2762        partition_by,
2763        retain_history,
2764        refresh,
2765        seen: _,
2766    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2767
2768    if let Some(partition_by) = partition_by {
2769        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2770        check_partition_by(&desc, partition_by)?;
2771    }
2772
2773    let refresh_schedule = {
2774        let mut refresh_schedule = RefreshSchedule::default();
2775        let mut on_commits_seen = 0;
2776        for refresh_option_value in refresh {
2777            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2778                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2779            }
2780            match refresh_option_value {
2781                RefreshOptionValue::OnCommit => {
2782                    on_commits_seen += 1;
2783                }
2784                RefreshOptionValue::AtCreation => {
2785                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2786                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2787                }
2788                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2789                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2790                    let ecx = &ExprContext {
2791                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2792                        name: "REFRESH AT",
2793                        scope: &Scope::empty(),
2794                        relation_type: &SqlRelationType::empty(),
2795                        allow_aggregates: false,
2796                        allow_subqueries: false,
2797                        allow_parameters: false,
2798                        allow_windows: false,
2799                    };
2800                    let hir = plan_expr(ecx, &time)?.cast_to(
2801                        ecx,
2802                        CastContext::Assignment,
2803                        &SqlScalarType::MzTimestamp,
2804                    )?;
2805                    // (mz_now was purified away to a literal earlier)
2806                    let timestamp = hir
2807                        .into_literal_mz_timestamp()
2808                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2809                    refresh_schedule.ats.push(timestamp);
2810                }
2811                RefreshOptionValue::Every(RefreshEveryOptionValue {
2812                    interval,
2813                    aligned_to,
2814                }) => {
2815                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2816                    if interval.as_microseconds() <= 0 {
2817                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2818                    }
2819                    if interval.months != 0 {
2820                        // This limitation is because we want Intervals to be cleanly convertable
2821                        // to a unix epoch timestamp difference. When the interval involves months, then
2822                        // this is not true anymore, because months have variable lengths.
2823                        // See `Timestamp::round_up`.
2824                        sql_bail!("REFRESH interval must not involve units larger than days");
2825                    }
2826                    let interval = interval.duration()?;
2827                    if u64::try_from(interval.as_millis()).is_err() {
2828                        sql_bail!("REFRESH interval too large");
2829                    }
2830
2831                    let mut aligned_to = match aligned_to {
2832                        Some(aligned_to) => aligned_to,
2833                        None => {
2834                            soft_panic_or_log!(
2835                                "ALIGNED TO should have been filled in by purification"
2836                            );
2837                            sql_bail!(
2838                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2839                            )
2840                        }
2841                    };
2842
2843                    // Desugar the `aligned_to` expression
2844                    transform_ast::transform(scx, &mut aligned_to)?;
2845
2846                    let ecx = &ExprContext {
2847                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2848                        name: "REFRESH EVERY ... ALIGNED TO",
2849                        scope: &Scope::empty(),
2850                        relation_type: &SqlRelationType::empty(),
2851                        allow_aggregates: false,
2852                        allow_subqueries: false,
2853                        allow_parameters: false,
2854                        allow_windows: false,
2855                    };
2856                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2857                        ecx,
2858                        CastContext::Assignment,
2859                        &SqlScalarType::MzTimestamp,
2860                    )?;
2861                    // (mz_now was purified away to a literal earlier)
2862                    let aligned_to_const = aligned_to_hir
2863                        .into_literal_mz_timestamp()
2864                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2865
2866                    refresh_schedule.everies.push(RefreshEvery {
2867                        interval,
2868                        aligned_to: aligned_to_const,
2869                    });
2870                }
2871            }
2872        }
2873
2874        if on_commits_seen > 1 {
2875            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2876        }
2877        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2878            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2879        }
2880
2881        if refresh_schedule == RefreshSchedule::default() {
2882            None
2883        } else {
2884            Some(refresh_schedule)
2885        }
2886    };
2887
2888    let as_of = stmt.as_of.map(Timestamp::from);
2889    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2890    let mut non_null_assertions = assert_not_null
2891        .into_iter()
2892        .map(normalize::column_name)
2893        .map(|assertion_name| {
2894            column_names
2895                .iter()
2896                .position(|col| col == &assertion_name)
2897                .ok_or_else(|| {
2898                    sql_err!(
2899                        "column {} in ASSERT NOT NULL option not found",
2900                        assertion_name.quoted()
2901                    )
2902                })
2903        })
2904        .collect::<Result<Vec<_>, _>>()?;
2905    non_null_assertions.sort();
2906    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2907        let dup = &column_names[*dup];
2908        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2909    }
2910
2911    if let Some(dup) = column_names.iter().duplicates().next() {
2912        sql_bail!("column {} specified more than once", dup.quoted());
2913    }
2914
2915    // Override the statement-level IfExistsBehavior with Skip if this is
2916    // explicitly requested in the PlanContext (the default is `false`).
2917    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2918        Ok(true) => IfExistsBehavior::Skip,
2919        _ => stmt.if_exists,
2920    };
2921
2922    let mut replace = None;
2923    let mut if_not_exists = false;
2924    match if_exists {
2925        IfExistsBehavior::Replace => {
2926            let if_exists = true;
2927            let cascade = false;
2928            let replace_id = plan_drop_item(
2929                scx,
2930                ObjectType::MaterializedView,
2931                if_exists,
2932                partial_name.into(),
2933                cascade,
2934            )?;
2935
2936            // Check if the new Materialized View depends on the item that we would be replacing.
2937            if let Some(id) = replace_id {
2938                let dependencies = expr.depends_on();
2939                let invalid_drop = scx
2940                    .get_item(&id)
2941                    .global_ids()
2942                    .any(|gid| dependencies.contains(&gid));
2943                if invalid_drop {
2944                    let item = scx.catalog.get_item(&id);
2945                    sql_bail!(
2946                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2947                        scx.catalog.resolve_full_name(item.name())
2948                    );
2949                }
2950                replace = Some(id);
2951            }
2952        }
2953        IfExistsBehavior::Skip => if_not_exists = true,
2954        IfExistsBehavior::Error => (),
2955    }
2956    let drop_ids = replace
2957        .map(|id| {
2958            scx.catalog
2959                .item_dependents(id)
2960                .into_iter()
2961                .map(|id| id.unwrap_item_id())
2962                .collect()
2963        })
2964        .unwrap_or_default();
2965    let dependencies = expr
2966        .depends_on()
2967        .into_iter()
2968        .map(|gid| scx.catalog.resolve_item_id(&gid))
2969        .collect();
2970
2971    // Check for an object in the catalog with this same name
2972    let full_name = scx.catalog.resolve_full_name(&name);
2973    let partial_name = PartialItemName::from(full_name.clone());
2974    // For PostgreSQL compatibility, we need to prevent creating materialized
2975    // views when there is an existing object *or* type of the same name.
2976    if let (IfExistsBehavior::Error, Ok(item)) =
2977        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2978    {
2979        return Err(PlanError::ItemAlreadyExists {
2980            name: full_name.to_string(),
2981            item_type: item.item_type(),
2982        });
2983    }
2984
2985    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2986        name,
2987        materialized_view: MaterializedView {
2988            create_sql,
2989            expr,
2990            dependencies,
2991            column_names,
2992            cluster_id,
2993            non_null_assertions,
2994            compaction_window,
2995            refresh_schedule,
2996            as_of,
2997        },
2998        replace,
2999        drop_ids,
3000        if_not_exists,
3001        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3002    }))
3003}
3004
3005generate_extracted_config!(
3006    MaterializedViewOption,
3007    (AssertNotNull, Ident, AllowMultiple),
3008    (PartitionBy, Vec<Ident>),
3009    (RetainHistory, OptionalDuration),
3010    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3011);
3012
3013pub fn plan_create_continual_task(
3014    scx: &StatementContext,
3015    mut stmt: CreateContinualTaskStatement<Aug>,
3016) -> Result<Plan, PlanError> {
3017    match &stmt.sugar {
3018        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3019        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3020            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3021        }
3022        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3023            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3024        }
3025    };
3026    let cluster_id = match &stmt.in_cluster {
3027        None => scx.catalog.resolve_cluster(None)?.id(),
3028        Some(in_cluster) => in_cluster.id,
3029    };
3030    stmt.in_cluster = Some(ResolvedClusterName {
3031        id: cluster_id,
3032        print_name: None,
3033    });
3034
3035    let create_sql =
3036        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3037
3038    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3039
3040    // It seems desirable for a CT that e.g. simply filters the input to keep
3041    // the same nullability. So, start by assuming all columns are non-nullable,
3042    // and then make them nullable below if any of the exprs plan them as
3043    // nullable.
3044    let mut desc = match stmt.columns {
3045        None => None,
3046        Some(columns) => {
3047            let mut desc_columns = Vec::with_capacity(columns.capacity());
3048            for col in columns.iter() {
3049                desc_columns.push((
3050                    normalize::column_name(col.name.clone()),
3051                    SqlColumnType {
3052                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3053                        nullable: false,
3054                    },
3055                ));
3056            }
3057            Some(RelationDesc::from_names_and_types(desc_columns))
3058        }
3059    };
3060    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3061    match input.item_type() {
3062        // Input must be a thing directly backed by a persist shard, so we can
3063        // use a persist listen to efficiently rehydrate.
3064        CatalogItemType::ContinualTask
3065        | CatalogItemType::Table
3066        | CatalogItemType::MaterializedView
3067        | CatalogItemType::Source => {}
3068        CatalogItemType::Sink
3069        | CatalogItemType::View
3070        | CatalogItemType::Index
3071        | CatalogItemType::Type
3072        | CatalogItemType::Func
3073        | CatalogItemType::Secret
3074        | CatalogItemType::Connection => {
3075            sql_bail!(
3076                "CONTINUAL TASK cannot use {} as an input",
3077                input.item_type()
3078            );
3079        }
3080    }
3081
3082    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3083    let ct_name = stmt.name;
3084    let placeholder_id = match &ct_name {
3085        ResolvedItemName::ContinualTask { id, name } => {
3086            let desc = match desc.as_ref().cloned() {
3087                Some(x) => x,
3088                None => {
3089                    // The user didn't specify the CT's columns. Take a wild
3090                    // guess that the CT has the same shape as the input. It's
3091                    // fine if this is wrong, we'll get an error below after
3092                    // planning the query.
3093                    let input_name = scx.catalog.resolve_full_name(input.name());
3094                    input.desc(&input_name)?.into_owned()
3095                }
3096            };
3097            qcx.ctes.insert(
3098                *id,
3099                CteDesc {
3100                    name: name.item.clone(),
3101                    desc,
3102                },
3103            );
3104            Some(*id)
3105        }
3106        _ => None,
3107    };
3108
3109    let mut exprs = Vec::new();
3110    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3111        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3112        let query::PlannedRootQuery {
3113            expr,
3114            desc: desc_query,
3115            finishing,
3116            scope: _,
3117        } = query::plan_ct_query(&mut qcx, query)?;
3118        // We get back a trivial finishing because we plan with a "maintained"
3119        // QueryLifetime, see comment in `plan_view`.
3120        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3121            &finishing,
3122            expr.arity()
3123        ));
3124        if expr.contains_parameters()? {
3125            if expr.contains_parameters()? {
3126                return Err(PlanError::ParameterNotAllowed(
3127                    "continual tasks".to_string(),
3128                ));
3129            }
3130        }
3131        let expr = match desc.as_mut() {
3132            None => {
3133                desc = Some(desc_query);
3134                expr
3135            }
3136            Some(desc) => {
3137                // We specify the columns for DELETE, so if any columns types don't
3138                // match, it's because it's an INSERT.
3139                if desc_query.arity() > desc.arity() {
3140                    sql_bail!(
3141                        "statement {}: INSERT has more expressions than target columns",
3142                        idx
3143                    );
3144                }
3145                if desc_query.arity() < desc.arity() {
3146                    sql_bail!(
3147                        "statement {}: INSERT has more target columns than expressions",
3148                        idx
3149                    );
3150                }
3151                // Ensure the types of the source query match the types of the target table,
3152                // installing assignment casts where necessary and possible.
3153                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3154                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3155                let expr = expr.map_err(|e| {
3156                    sql_err!(
3157                        "statement {}: column {} is of type {} but expression is of type {}",
3158                        idx,
3159                        desc.get_name(e.column).quoted(),
3160                        qcx.humanize_scalar_type(&e.target_type, false),
3161                        qcx.humanize_scalar_type(&e.source_type, false),
3162                    )
3163                })?;
3164
3165                // Update ct nullability as necessary. The `ne` above verified that the
3166                // types are the same len.
3167                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3168                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3169                if updated {
3170                    let new_types = zip_types().map(|(ct, q)| {
3171                        let mut ct = ct.clone();
3172                        if q.nullable {
3173                            ct.nullable = true;
3174                        }
3175                        ct
3176                    });
3177                    *desc = RelationDesc::from_names_and_types(
3178                        desc.iter_names().cloned().zip_eq(new_types),
3179                    );
3180                }
3181
3182                expr
3183            }
3184        };
3185        match stmt {
3186            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3187            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3188        }
3189    }
3190    // TODO(ct3): Collect things by output and assert that there is only one (or
3191    // support multiple outputs).
3192    let expr = exprs
3193        .into_iter()
3194        .reduce(|acc, expr| acc.union(expr))
3195        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3196    let dependencies = expr
3197        .depends_on()
3198        .into_iter()
3199        .map(|gid| scx.catalog.resolve_item_id(&gid))
3200        .collect();
3201
3202    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3203    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3204    if let Some(dup) = column_names.iter().duplicates().next() {
3205        sql_bail!("column {} specified more than once", dup.quoted());
3206    }
3207
3208    // Check for an object in the catalog with this same name
3209    let name = match &ct_name {
3210        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3211        ResolvedItemName::ContinualTask { name, .. } => {
3212            let name = scx.allocate_qualified_name(name.clone())?;
3213            let full_name = scx.catalog.resolve_full_name(&name);
3214            let partial_name = PartialItemName::from(full_name.clone());
3215            // For PostgreSQL compatibility, we need to prevent creating this when there
3216            // is an existing object *or* type of the same name.
3217            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3218                return Err(PlanError::ItemAlreadyExists {
3219                    name: full_name.to_string(),
3220                    item_type: item.item_type(),
3221                });
3222            }
3223            name
3224        }
3225        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3226        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3227    };
3228
3229    let as_of = stmt.as_of.map(Timestamp::from);
3230    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3231        name,
3232        placeholder_id,
3233        desc,
3234        input_id: input.global_id(),
3235        with_snapshot: snapshot.unwrap_or(true),
3236        continual_task: MaterializedView {
3237            create_sql,
3238            expr,
3239            dependencies,
3240            column_names,
3241            cluster_id,
3242            non_null_assertions: Vec::new(),
3243            compaction_window: None,
3244            refresh_schedule: None,
3245            as_of,
3246        },
3247    }))
3248}
3249
3250fn continual_task_query<'a>(
3251    ct_name: &ResolvedItemName,
3252    stmt: &'a ast::ContinualTaskStmt<Aug>,
3253) -> Option<ast::Query<Aug>> {
3254    match stmt {
3255        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3256            table_name: _,
3257            columns,
3258            source,
3259            returning,
3260        }) => {
3261            if !columns.is_empty() || !returning.is_empty() {
3262                return None;
3263            }
3264            match source {
3265                ast::InsertSource::Query(query) => Some(query.clone()),
3266                ast::InsertSource::DefaultValues => None,
3267            }
3268        }
3269        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3270            table_name: _,
3271            alias,
3272            using,
3273            selection,
3274        }) => {
3275            if !using.is_empty() {
3276                return None;
3277            }
3278            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3279            let from = ast::TableWithJoins {
3280                relation: ast::TableFactor::Table {
3281                    name: ct_name.clone(),
3282                    alias: alias.clone(),
3283                },
3284                joins: Vec::new(),
3285            };
3286            let select = ast::Select {
3287                from: vec![from],
3288                selection: selection.clone(),
3289                distinct: None,
3290                projection: vec![ast::SelectItem::Wildcard],
3291                group_by: Vec::new(),
3292                having: None,
3293                qualify: None,
3294                options: Vec::new(),
3295            };
3296            let query = ast::Query {
3297                ctes: ast::CteBlock::Simple(Vec::new()),
3298                body: ast::SetExpr::Select(Box::new(select)),
3299                order_by: Vec::new(),
3300                limit: None,
3301                offset: None,
3302            };
3303            // Then negate it to turn it into retractions (after planning it).
3304            Some(query)
3305        }
3306    }
3307}
3308
3309generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3310
3311pub fn describe_create_sink(
3312    _: &StatementContext,
3313    _: CreateSinkStatement<Aug>,
3314) -> Result<StatementDesc, PlanError> {
3315    Ok(StatementDesc::new(None))
3316}
3317
3318generate_extracted_config!(
3319    CreateSinkOption,
3320    (Snapshot, bool),
3321    (PartitionStrategy, String),
3322    (Version, u64),
3323    (CommitInterval, Duration)
3324);
3325
3326pub fn plan_create_sink(
3327    scx: &StatementContext,
3328    stmt: CreateSinkStatement<Aug>,
3329) -> Result<Plan, PlanError> {
3330    // Check for an object in the catalog with this same name
3331    let Some(name) = stmt.name.clone() else {
3332        return Err(PlanError::MissingName(CatalogItemType::Sink));
3333    };
3334    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3335    let full_name = scx.catalog.resolve_full_name(&name);
3336    let partial_name = PartialItemName::from(full_name.clone());
3337    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3338        return Err(PlanError::ItemAlreadyExists {
3339            name: full_name.to_string(),
3340            item_type: item.item_type(),
3341        });
3342    }
3343
3344    plan_sink(scx, stmt)
3345}
3346
3347/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3348/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3349/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3350/// important.
3351fn plan_sink(
3352    scx: &StatementContext,
3353    mut stmt: CreateSinkStatement<Aug>,
3354) -> Result<Plan, PlanError> {
3355    let CreateSinkStatement {
3356        name,
3357        in_cluster: _,
3358        from,
3359        connection,
3360        format,
3361        envelope,
3362        if_not_exists,
3363        with_options,
3364    } = stmt.clone();
3365
3366    let Some(name) = name else {
3367        return Err(PlanError::MissingName(CatalogItemType::Sink));
3368    };
3369    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3370
3371    let envelope = match envelope {
3372        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3373        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3374        None => sql_bail!("ENVELOPE clause is required"),
3375    };
3376
3377    let from_name = &from;
3378    let from = scx.get_item_by_resolved_name(&from)?;
3379    if from.id().is_system() {
3380        bail_unsupported!("creating a sink directly on a catalog object");
3381    }
3382    let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3383    let key_indices = match &connection {
3384        CreateSinkConnection::Kafka { key: Some(key), .. }
3385        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3386            let key_columns = key
3387                .key_columns
3388                .clone()
3389                .into_iter()
3390                .map(normalize::column_name)
3391                .collect::<Vec<_>>();
3392            let mut uniq = BTreeSet::new();
3393            for col in key_columns.iter() {
3394                if !uniq.insert(col) {
3395                    sql_bail!("duplicate column referenced in KEY: {}", col);
3396                }
3397            }
3398            let indices = key_columns
3399                .iter()
3400                .map(|col| -> anyhow::Result<usize> {
3401                    let name_idx =
3402                        desc.get_by_name(col)
3403                            .map(|(idx, _type)| idx)
3404                            .ok_or_else(|| {
3405                                sql_err!("column referenced in KEY does not exist: {}", col)
3406                            })?;
3407                    if desc.get_unambiguous_name(name_idx).is_none() {
3408                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3409                    }
3410                    Ok(name_idx)
3411                })
3412                .collect::<Result<Vec<_>, _>>()?;
3413            let is_valid_key = desc
3414                .typ()
3415                .keys
3416                .iter()
3417                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3418
3419            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3420                if key.not_enforced {
3421                    scx.catalog
3422                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3423                            key: key_columns.clone(),
3424                            name: name.item.clone(),
3425                        })
3426                } else {
3427                    return Err(PlanError::UpsertSinkWithInvalidKey {
3428                        name: from_name.full_name_str(),
3429                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3430                        valid_keys: desc
3431                            .typ()
3432                            .keys
3433                            .iter()
3434                            .map(|key| {
3435                                key.iter()
3436                                    .map(|col| desc.get_name(*col).as_str().into())
3437                                    .collect()
3438                            })
3439                            .collect(),
3440                    });
3441                }
3442            }
3443            Some(indices)
3444        }
3445        CreateSinkConnection::Kafka { key: None, .. }
3446        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3447    };
3448
3449    let headers_index = match &connection {
3450        CreateSinkConnection::Kafka {
3451            headers: Some(headers),
3452            ..
3453        } => {
3454            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3455
3456            match envelope {
3457                SinkEnvelope::Upsert => (),
3458                SinkEnvelope::Debezium => {
3459                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3460                }
3461            };
3462
3463            let headers = normalize::column_name(headers.clone());
3464            let (idx, ty) = desc
3465                .get_by_name(&headers)
3466                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3467
3468            if desc.get_unambiguous_name(idx).is_none() {
3469                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3470            }
3471
3472            match &ty.scalar_type {
3473                SqlScalarType::Map { value_type, .. }
3474                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3475                _ => sql_bail!(
3476                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3477                ),
3478            }
3479
3480            Some(idx)
3481        }
3482        _ => None,
3483    };
3484
3485    // pick the first valid natural relation key, if any
3486    let relation_key_indices = desc.typ().keys.get(0).cloned();
3487
3488    let key_desc_and_indices = key_indices.map(|key_indices| {
3489        let cols = desc
3490            .iter()
3491            .map(|(name, ty)| (name.clone(), ty.clone()))
3492            .collect::<Vec<_>>();
3493        let (names, types): (Vec<_>, Vec<_>) =
3494            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3495        let typ = SqlRelationType::new(types);
3496        (RelationDesc::new(typ, names), key_indices)
3497    });
3498
3499    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3500        return Err(PlanError::UpsertSinkWithoutKey);
3501    }
3502
3503    let connection_builder = match connection {
3504        CreateSinkConnection::Kafka {
3505            connection,
3506            options,
3507            ..
3508        } => kafka_sink_builder(
3509            scx,
3510            connection,
3511            options,
3512            format,
3513            relation_key_indices,
3514            key_desc_and_indices,
3515            headers_index,
3516            desc.into_owned(),
3517            envelope,
3518            from.id(),
3519        )?,
3520        CreateSinkConnection::Iceberg {
3521            connection,
3522            aws_connection,
3523            options,
3524            ..
3525        } => iceberg_sink_builder(
3526            scx,
3527            connection,
3528            aws_connection,
3529            options,
3530            relation_key_indices,
3531            key_desc_and_indices,
3532        )?,
3533    };
3534
3535    let CreateSinkOptionExtracted {
3536        snapshot,
3537        version,
3538        partition_strategy: _,
3539        seen: _,
3540        commit_interval: _,
3541    } = with_options.try_into()?;
3542
3543    // WITH SNAPSHOT defaults to true
3544    let with_snapshot = snapshot.unwrap_or(true);
3545    // VERSION defaults to 0
3546    let version = version.unwrap_or(0);
3547
3548    // We will rewrite the cluster if one is not provided, so we must use the
3549    // `in_cluster` value we plan to normalize when we canonicalize the create
3550    // statement.
3551    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3552    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3553
3554    Ok(Plan::CreateSink(CreateSinkPlan {
3555        name,
3556        sink: Sink {
3557            create_sql,
3558            from: from.global_id(),
3559            connection: connection_builder,
3560            envelope,
3561            version,
3562        },
3563        with_snapshot,
3564        if_not_exists,
3565        in_cluster: in_cluster.id(),
3566    }))
3567}
3568
3569fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3570    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3571
3572    let existing_keys = desc
3573        .typ()
3574        .keys
3575        .iter()
3576        .map(|key_columns| {
3577            key_columns
3578                .iter()
3579                .map(|col| desc.get_name(*col).as_str())
3580                .join(", ")
3581        })
3582        .join(", ");
3583
3584    sql_err!(
3585        "Key constraint ({}) conflicts with existing key ({})",
3586        user_keys,
3587        existing_keys
3588    )
3589}
3590
3591/// Creating this by hand instead of using generate_extracted_config! macro
3592/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3593#[derive(Debug, Default, PartialEq, Clone)]
3594pub struct CsrConfigOptionExtracted {
3595    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3596    pub(crate) avro_key_fullname: Option<String>,
3597    pub(crate) avro_value_fullname: Option<String>,
3598    pub(crate) null_defaults: bool,
3599    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3600    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3601    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3602    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3603}
3604
3605impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3606    type Error = crate::plan::PlanError;
3607    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3608        let mut extracted = CsrConfigOptionExtracted::default();
3609        let mut common_doc_comments = BTreeMap::new();
3610        for option in v {
3611            if !extracted.seen.insert(option.name.clone()) {
3612                return Err(PlanError::Unstructured({
3613                    format!("{} specified more than once", option.name)
3614                }));
3615            }
3616            let option_name = option.name.clone();
3617            let option_name_str = option_name.to_ast_string_simple();
3618            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3619                option_name: option_name.to_ast_string_simple(),
3620                err: e.into(),
3621            };
3622            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3623                val.map(|s| match s {
3624                    WithOptionValue::Value(Value::String(s)) => {
3625                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3626                    }
3627                    _ => Err("must be a string".to_string()),
3628                })
3629                .transpose()
3630                .map_err(PlanError::Unstructured)
3631                .map_err(better_error)
3632            };
3633            match option.name {
3634                CsrConfigOptionName::AvroKeyFullname => {
3635                    extracted.avro_key_fullname =
3636                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3637                }
3638                CsrConfigOptionName::AvroValueFullname => {
3639                    extracted.avro_value_fullname =
3640                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3641                }
3642                CsrConfigOptionName::NullDefaults => {
3643                    extracted.null_defaults =
3644                        <bool>::try_from_value(option.value).map_err(better_error)?;
3645                }
3646                CsrConfigOptionName::AvroDocOn(doc_on) => {
3647                    let value = String::try_from_value(option.value.ok_or_else(|| {
3648                        PlanError::InvalidOptionValue {
3649                            option_name: option_name_str,
3650                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3651                        }
3652                    })?)
3653                    .map_err(better_error)?;
3654                    let key = match doc_on.identifier {
3655                        DocOnIdentifier::Column(ast::ColumnName {
3656                            relation: ResolvedItemName::Item { id, .. },
3657                            column: ResolvedColumnReference::Column { name, index: _ },
3658                        }) => DocTarget::Field {
3659                            object_id: id,
3660                            column_name: name,
3661                        },
3662                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3663                            DocTarget::Type(id)
3664                        }
3665                        _ => unreachable!(),
3666                    };
3667
3668                    match doc_on.for_schema {
3669                        DocOnSchema::KeyOnly => {
3670                            extracted.key_doc_options.insert(key, value);
3671                        }
3672                        DocOnSchema::ValueOnly => {
3673                            extracted.value_doc_options.insert(key, value);
3674                        }
3675                        DocOnSchema::All => {
3676                            common_doc_comments.insert(key, value);
3677                        }
3678                    }
3679                }
3680                CsrConfigOptionName::KeyCompatibilityLevel => {
3681                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3682                }
3683                CsrConfigOptionName::ValueCompatibilityLevel => {
3684                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3685                }
3686            }
3687        }
3688
3689        for (key, value) in common_doc_comments {
3690            if !extracted.key_doc_options.contains_key(&key) {
3691                extracted.key_doc_options.insert(key.clone(), value.clone());
3692            }
3693            if !extracted.value_doc_options.contains_key(&key) {
3694                extracted.value_doc_options.insert(key, value);
3695            }
3696        }
3697        Ok(extracted)
3698    }
3699}
3700
3701fn iceberg_sink_builder(
3702    scx: &StatementContext,
3703    catalog_connection: ResolvedItemName,
3704    aws_connection: ResolvedItemName,
3705    options: Vec<IcebergSinkConfigOption<Aug>>,
3706    relation_key_indices: Option<Vec<usize>>,
3707    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3708) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3709    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3710    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3711    let catalog_connection_id = catalog_connection_item.id();
3712    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3713    let aws_connection_id = aws_connection_item.id();
3714    if !matches!(
3715        catalog_connection_item.connection()?,
3716        Connection::IcebergCatalog(_)
3717    ) {
3718        sql_bail!(
3719            "{} is not an iceberg catalog connection",
3720            scx.catalog
3721                .resolve_full_name(catalog_connection_item.name())
3722                .to_string()
3723                .quoted()
3724        );
3725    };
3726
3727    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3728        sql_bail!(
3729            "{} is not an AWS connection",
3730            scx.catalog
3731                .resolve_full_name(aws_connection_item.name())
3732                .to_string()
3733                .quoted()
3734        );
3735    }
3736
3737    let IcebergSinkConfigOptionExtracted {
3738        table,
3739        namespace,
3740        seen: _,
3741    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3742
3743    let Some(table) = table else {
3744        sql_bail!("Iceberg sink must specify TABLE");
3745    };
3746    let Some(namespace) = namespace else {
3747        sql_bail!("Iceberg sink must specify NAMESPACE");
3748    };
3749
3750    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3751        catalog_connection_id,
3752        catalog_connection: catalog_connection_id,
3753        aws_connection_id,
3754        aws_connection: aws_connection_id,
3755        table,
3756        namespace,
3757        relation_key_indices,
3758        key_desc_and_indices,
3759    }))
3760}
3761
3762fn kafka_sink_builder(
3763    scx: &StatementContext,
3764    connection: ResolvedItemName,
3765    options: Vec<KafkaSinkConfigOption<Aug>>,
3766    format: Option<FormatSpecifier<Aug>>,
3767    relation_key_indices: Option<Vec<usize>>,
3768    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3769    headers_index: Option<usize>,
3770    value_desc: RelationDesc,
3771    envelope: SinkEnvelope,
3772    sink_from: CatalogItemId,
3773) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3774    // Get Kafka connection.
3775    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3776    let connection_id = connection_item.id();
3777    match connection_item.connection()? {
3778        Connection::Kafka(_) => (),
3779        _ => sql_bail!(
3780            "{} is not a kafka connection",
3781            scx.catalog.resolve_full_name(connection_item.name())
3782        ),
3783    };
3784
3785    let KafkaSinkConfigOptionExtracted {
3786        topic,
3787        compression_type,
3788        partition_by,
3789        progress_group_id_prefix,
3790        transactional_id_prefix,
3791        legacy_ids,
3792        topic_config,
3793        topic_metadata_refresh_interval,
3794        topic_partition_count,
3795        topic_replication_factor,
3796        seen: _,
3797    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3798
3799    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3800        (Some(_), Some(true)) => {
3801            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3802        }
3803        (None, Some(true)) => KafkaIdStyle::Legacy,
3804        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3805    };
3806
3807    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3808        (Some(_), Some(true)) => {
3809            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3810        }
3811        (None, Some(true)) => KafkaIdStyle::Legacy,
3812        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3813    };
3814
3815    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3816
3817    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3818        // This is a librdkafka-enforced restriction that, if violated,
3819        // would result in a runtime error for the source.
3820        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3821    }
3822
3823    let assert_positive = |val: Option<i32>, name: &str| {
3824        if let Some(val) = val {
3825            if val <= 0 {
3826                sql_bail!("{} must be a positive integer", name);
3827            }
3828        }
3829        val.map(NonNeg::try_from)
3830            .transpose()
3831            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3832    };
3833    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3834    let topic_replication_factor =
3835        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3836
3837    // Helper method to parse avro connection options for format specifiers that use avro
3838    // for either key or value encoding.
3839    let gen_avro_schema_options = |conn| {
3840        let CsrConnectionAvro {
3841            connection:
3842                CsrConnection {
3843                    connection,
3844                    options,
3845                },
3846            seed,
3847            key_strategy,
3848            value_strategy,
3849        } = conn;
3850        if seed.is_some() {
3851            sql_bail!("SEED option does not make sense with sinks");
3852        }
3853        if key_strategy.is_some() {
3854            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3855        }
3856        if value_strategy.is_some() {
3857            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3858        }
3859
3860        let item = scx.get_item_by_resolved_name(&connection)?;
3861        let csr_connection = match item.connection()? {
3862            Connection::Csr(_) => item.id(),
3863            _ => {
3864                sql_bail!(
3865                    "{} is not a schema registry connection",
3866                    scx.catalog
3867                        .resolve_full_name(item.name())
3868                        .to_string()
3869                        .quoted()
3870                )
3871            }
3872        };
3873        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3874
3875        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3876            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3877        }
3878
3879        if key_desc_and_indices.is_some()
3880            && (extracted_options.avro_key_fullname.is_some()
3881                ^ extracted_options.avro_value_fullname.is_some())
3882        {
3883            sql_bail!(
3884                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3885            );
3886        }
3887
3888        Ok((csr_connection, extracted_options))
3889    };
3890
3891    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3892        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3893        Format::Bytes if desc.arity() == 1 => {
3894            let col_type = &desc.typ().column_types[0].scalar_type;
3895            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3896                bail_unsupported!(format!(
3897                    "BYTES format with non-encodable type: {:?}",
3898                    col_type
3899                ));
3900            }
3901
3902            Ok(KafkaSinkFormatType::Bytes)
3903        }
3904        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3905        Format::Bytes | Format::Text => {
3906            bail_unsupported!("BYTES or TEXT format with multiple columns")
3907        }
3908        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3909        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3910            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3911            let schema = if is_key {
3912                AvroSchemaGenerator::new(
3913                    desc.clone(),
3914                    false,
3915                    options.key_doc_options,
3916                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3917                    options.null_defaults,
3918                    Some(sink_from),
3919                    false,
3920                )?
3921                .schema()
3922                .to_string()
3923            } else {
3924                AvroSchemaGenerator::new(
3925                    desc.clone(),
3926                    matches!(envelope, SinkEnvelope::Debezium),
3927                    options.value_doc_options,
3928                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3929                    options.null_defaults,
3930                    Some(sink_from),
3931                    true,
3932                )?
3933                .schema()
3934                .to_string()
3935            };
3936            Ok(KafkaSinkFormatType::Avro {
3937                schema,
3938                compatibility_level: if is_key {
3939                    options.key_compatibility_level
3940                } else {
3941                    options.value_compatibility_level
3942                },
3943                csr_connection,
3944            })
3945        }
3946        format => bail_unsupported!(format!("sink format {:?}", format)),
3947    };
3948
3949    let partition_by = match &partition_by {
3950        Some(partition_by) => {
3951            let mut scope = Scope::from_source(None, value_desc.iter_names());
3952
3953            match envelope {
3954                SinkEnvelope::Upsert => (),
3955                SinkEnvelope::Debezium => {
3956                    let key_indices: HashSet<_> = key_desc_and_indices
3957                        .as_ref()
3958                        .map(|(_desc, indices)| indices.as_slice())
3959                        .unwrap_or_default()
3960                        .into_iter()
3961                        .collect();
3962                    for (i, item) in scope.items.iter_mut().enumerate() {
3963                        if !key_indices.contains(&i) {
3964                            item.error_if_referenced = Some(|_table, column| {
3965                                PlanError::InvalidPartitionByEnvelopeDebezium {
3966                                    column_name: column.to_string(),
3967                                }
3968                            });
3969                        }
3970                    }
3971                }
3972            };
3973
3974            let ecx = &ExprContext {
3975                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3976                name: "PARTITION BY",
3977                scope: &scope,
3978                relation_type: value_desc.typ(),
3979                allow_aggregates: false,
3980                allow_subqueries: false,
3981                allow_parameters: false,
3982                allow_windows: false,
3983            };
3984            let expr = plan_expr(ecx, partition_by)?.cast_to(
3985                ecx,
3986                CastContext::Assignment,
3987                &SqlScalarType::UInt64,
3988            )?;
3989            let expr = expr.lower_uncorrelated()?;
3990
3991            Some(expr)
3992        }
3993        _ => None,
3994    };
3995
3996    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3997    let format = match format {
3998        Some(FormatSpecifier::KeyValue { key, value }) => {
3999            let key_format = match key_desc_and_indices.as_ref() {
4000                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4001                None => None,
4002            };
4003            KafkaSinkFormat {
4004                value_format: map_format(value, &value_desc, false)?,
4005                key_format,
4006            }
4007        }
4008        Some(FormatSpecifier::Bare(format)) => {
4009            let key_format = match key_desc_and_indices.as_ref() {
4010                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4011                None => None,
4012            };
4013            KafkaSinkFormat {
4014                value_format: map_format(format, &value_desc, false)?,
4015                key_format,
4016            }
4017        }
4018        None => bail_unsupported!("sink without format"),
4019    };
4020
4021    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4022        connection_id,
4023        connection: connection_id,
4024        format,
4025        topic: topic_name,
4026        relation_key_indices,
4027        key_desc_and_indices,
4028        headers_index,
4029        value_desc,
4030        partition_by,
4031        compression_type,
4032        progress_group_id,
4033        transactional_id,
4034        topic_options: KafkaTopicOptions {
4035            partition_count: topic_partition_count,
4036            replication_factor: topic_replication_factor,
4037            topic_config: topic_config.unwrap_or_default(),
4038        },
4039        topic_metadata_refresh_interval,
4040    }))
4041}
4042
4043pub fn describe_create_index(
4044    _: &StatementContext,
4045    _: CreateIndexStatement<Aug>,
4046) -> Result<StatementDesc, PlanError> {
4047    Ok(StatementDesc::new(None))
4048}
4049
4050pub fn plan_create_index(
4051    scx: &StatementContext,
4052    mut stmt: CreateIndexStatement<Aug>,
4053) -> Result<Plan, PlanError> {
4054    let CreateIndexStatement {
4055        name,
4056        on_name,
4057        in_cluster,
4058        key_parts,
4059        with_options,
4060        if_not_exists,
4061    } = &mut stmt;
4062    let on = scx.get_item_by_resolved_name(on_name)?;
4063    let on_item_type = on.item_type();
4064
4065    if !matches!(
4066        on_item_type,
4067        CatalogItemType::View
4068            | CatalogItemType::MaterializedView
4069            | CatalogItemType::Source
4070            | CatalogItemType::Table
4071    ) {
4072        sql_bail!(
4073            "index cannot be created on {} because it is a {}",
4074            on_name.full_name_str(),
4075            on.item_type()
4076        )
4077    }
4078
4079    let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
4080
4081    let filled_key_parts = match key_parts {
4082        Some(kp) => kp.to_vec(),
4083        None => {
4084            // `key_parts` is None if we're creating a "default" index.
4085            on.desc(&scx.catalog.resolve_full_name(on.name()))?
4086                .typ()
4087                .default_key()
4088                .iter()
4089                .map(|i| match on_desc.get_unambiguous_name(*i) {
4090                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4091                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4092                })
4093                .collect()
4094        }
4095    };
4096    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4097
4098    let index_name = if let Some(name) = name {
4099        QualifiedItemName {
4100            qualifiers: on.name().qualifiers.clone(),
4101            item: normalize::ident(name.clone()),
4102        }
4103    } else {
4104        let mut idx_name = QualifiedItemName {
4105            qualifiers: on.name().qualifiers.clone(),
4106            item: on.name().item.clone(),
4107        };
4108        if key_parts.is_none() {
4109            // We're trying to create the "default" index.
4110            idx_name.item += "_primary_idx";
4111        } else {
4112            // Use PG schema for automatically naming indexes:
4113            // `<table>_<_-separated indexed expressions>_idx`
4114            let index_name_col_suffix = keys
4115                .iter()
4116                .map(|k| match k {
4117                    mz_expr::MirScalarExpr::Column(i, name) => {
4118                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4119                            (Some(col_name), _) => col_name.to_string(),
4120                            (None, Some(name)) => name.to_string(),
4121                            (None, None) => format!("{}", i + 1),
4122                        }
4123                    }
4124                    _ => "expr".to_string(),
4125                })
4126                .join("_");
4127            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4128                .expect("write on strings cannot fail");
4129            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4130        }
4131
4132        if !*if_not_exists {
4133            scx.catalog.find_available_name(idx_name)
4134        } else {
4135            idx_name
4136        }
4137    };
4138
4139    // Check for an object in the catalog with this same name
4140    let full_name = scx.catalog.resolve_full_name(&index_name);
4141    let partial_name = PartialItemName::from(full_name.clone());
4142    // For PostgreSQL compatibility, we need to prevent creating indexes when
4143    // there is an existing object *or* type of the same name.
4144    //
4145    // Technically, we only need to prevent coexistence of indexes and types
4146    // that have an associated relation (record types but not list/map types).
4147    // Enforcing that would be more complicated, though. It's backwards
4148    // compatible to weaken this restriction in the future.
4149    if let (Ok(item), false, false) = (
4150        scx.catalog.resolve_item_or_type(&partial_name),
4151        *if_not_exists,
4152        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4153    ) {
4154        return Err(PlanError::ItemAlreadyExists {
4155            name: full_name.to_string(),
4156            item_type: item.item_type(),
4157        });
4158    }
4159
4160    let options = plan_index_options(scx, with_options.clone())?;
4161    let cluster_id = match in_cluster {
4162        None => scx.resolve_cluster(None)?.id(),
4163        Some(in_cluster) => in_cluster.id,
4164    };
4165
4166    *in_cluster = Some(ResolvedClusterName {
4167        id: cluster_id,
4168        print_name: None,
4169    });
4170
4171    // Normalize `stmt`.
4172    *name = Some(Ident::new(index_name.item.clone())?);
4173    *key_parts = Some(filled_key_parts);
4174    let if_not_exists = *if_not_exists;
4175
4176    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4177    let compaction_window = options.iter().find_map(|o| {
4178        #[allow(irrefutable_let_patterns)]
4179        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4180            Some(lcw.clone())
4181        } else {
4182            None
4183        }
4184    });
4185
4186    Ok(Plan::CreateIndex(CreateIndexPlan {
4187        name: index_name,
4188        index: Index {
4189            create_sql,
4190            on: on.global_id(),
4191            keys,
4192            cluster_id,
4193            compaction_window,
4194        },
4195        if_not_exists,
4196    }))
4197}
4198
4199pub fn describe_create_type(
4200    _: &StatementContext,
4201    _: CreateTypeStatement<Aug>,
4202) -> Result<StatementDesc, PlanError> {
4203    Ok(StatementDesc::new(None))
4204}
4205
4206pub fn plan_create_type(
4207    scx: &StatementContext,
4208    stmt: CreateTypeStatement<Aug>,
4209) -> Result<Plan, PlanError> {
4210    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4211    let CreateTypeStatement { name, as_type, .. } = stmt;
4212
4213    fn validate_data_type(
4214        scx: &StatementContext,
4215        data_type: ResolvedDataType,
4216        as_type: &str,
4217        key: &str,
4218    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4219        let (id, modifiers) = match data_type {
4220            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4221            _ => sql_bail!(
4222                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4223                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4224                as_type,
4225                key,
4226                data_type.human_readable_name(),
4227            ),
4228        };
4229
4230        let item = scx.catalog.get_item(&id);
4231        match item.type_details() {
4232            None => sql_bail!(
4233                "{} must be of class type, but received {} which is of class {}",
4234                key,
4235                scx.catalog.resolve_full_name(item.name()),
4236                item.item_type()
4237            ),
4238            Some(CatalogTypeDetails {
4239                typ: CatalogType::Char,
4240                ..
4241            }) => {
4242                bail_unsupported!("embedding char type in a list or map")
4243            }
4244            _ => {
4245                // Validate that the modifiers are actually valid.
4246                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4247
4248                Ok((id, modifiers))
4249            }
4250        }
4251    }
4252
4253    let inner = match as_type {
4254        CreateTypeAs::List { options } => {
4255            let CreateTypeListOptionExtracted {
4256                element_type,
4257                seen: _,
4258            } = CreateTypeListOptionExtracted::try_from(options)?;
4259            let element_type =
4260                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4261            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4262            CatalogType::List {
4263                element_reference: id,
4264                element_modifiers: modifiers,
4265            }
4266        }
4267        CreateTypeAs::Map { options } => {
4268            let CreateTypeMapOptionExtracted {
4269                key_type,
4270                value_type,
4271                seen: _,
4272            } = CreateTypeMapOptionExtracted::try_from(options)?;
4273            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4274            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4275            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4276            let (value_id, value_modifiers) =
4277                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4278            CatalogType::Map {
4279                key_reference: key_id,
4280                key_modifiers,
4281                value_reference: value_id,
4282                value_modifiers,
4283            }
4284        }
4285        CreateTypeAs::Record { column_defs } => {
4286            let mut fields = vec![];
4287            for column_def in column_defs {
4288                let data_type = column_def.data_type;
4289                let key = ident(column_def.name.clone());
4290                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4291                fields.push(CatalogRecordField {
4292                    name: ColumnName::from(key.clone()),
4293                    type_reference: id,
4294                    type_modifiers: modifiers,
4295                });
4296            }
4297            CatalogType::Record { fields }
4298        }
4299    };
4300
4301    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4302
4303    // Check for an object in the catalog with this same name
4304    let full_name = scx.catalog.resolve_full_name(&name);
4305    let partial_name = PartialItemName::from(full_name.clone());
4306    // For PostgreSQL compatibility, we need to prevent creating types when
4307    // there is an existing object *or* type of the same name.
4308    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4309        if item.item_type().conflicts_with_type() {
4310            return Err(PlanError::ItemAlreadyExists {
4311                name: full_name.to_string(),
4312                item_type: item.item_type(),
4313            });
4314        }
4315    }
4316
4317    Ok(Plan::CreateType(CreateTypePlan {
4318        name,
4319        typ: Type { create_sql, inner },
4320    }))
4321}
4322
4323generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4324
4325generate_extracted_config!(
4326    CreateTypeMapOption,
4327    (KeyType, ResolvedDataType),
4328    (ValueType, ResolvedDataType)
4329);
4330
4331#[derive(Debug)]
4332pub enum PlannedAlterRoleOption {
4333    Attributes(PlannedRoleAttributes),
4334    Variable(PlannedRoleVariable),
4335}
4336
4337#[derive(Debug, Clone)]
4338pub struct PlannedRoleAttributes {
4339    pub inherit: Option<bool>,
4340    pub password: Option<Password>,
4341    pub scram_iterations: Option<NonZeroU32>,
4342    /// `nopassword` is set to true if the password is from the parser is None.
4343    /// This is semantically different than not supplying a password at all,
4344    /// to allow for unsetting a password.
4345    pub nopassword: Option<bool>,
4346    pub superuser: Option<bool>,
4347    pub login: Option<bool>,
4348}
4349
4350fn plan_role_attributes(
4351    options: Vec<RoleAttribute>,
4352    scx: &StatementContext,
4353) -> Result<PlannedRoleAttributes, PlanError> {
4354    let mut planned_attributes = PlannedRoleAttributes {
4355        inherit: None,
4356        password: None,
4357        scram_iterations: None,
4358        superuser: None,
4359        login: None,
4360        nopassword: None,
4361    };
4362
4363    for option in options {
4364        match option {
4365            RoleAttribute::Inherit | RoleAttribute::NoInherit
4366                if planned_attributes.inherit.is_some() =>
4367            {
4368                sql_bail!("conflicting or redundant options");
4369            }
4370            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4371                bail_never_supported!(
4372                    "CREATECLUSTER attribute",
4373                    "sql/create-role/#details",
4374                    "Use system privileges instead."
4375                );
4376            }
4377            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4378                bail_never_supported!(
4379                    "CREATEDB attribute",
4380                    "sql/create-role/#details",
4381                    "Use system privileges instead."
4382                );
4383            }
4384            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4385                bail_never_supported!(
4386                    "CREATEROLE attribute",
4387                    "sql/create-role/#details",
4388                    "Use system privileges instead."
4389                );
4390            }
4391            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4392                sql_bail!("conflicting or redundant options");
4393            }
4394
4395            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4396            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4397            RoleAttribute::Password(password) => {
4398                if let Some(password) = password {
4399                    planned_attributes.password = Some(password.into());
4400                    planned_attributes.scram_iterations =
4401                        Some(scx.catalog.system_vars().scram_iterations())
4402                } else {
4403                    planned_attributes.nopassword = Some(true);
4404                }
4405            }
4406            RoleAttribute::SuperUser => {
4407                if planned_attributes.superuser == Some(false) {
4408                    sql_bail!("conflicting or redundant options");
4409                }
4410                planned_attributes.superuser = Some(true);
4411            }
4412            RoleAttribute::NoSuperUser => {
4413                if planned_attributes.superuser == Some(true) {
4414                    sql_bail!("conflicting or redundant options");
4415                }
4416                planned_attributes.superuser = Some(false);
4417            }
4418            RoleAttribute::Login => {
4419                if planned_attributes.login == Some(false) {
4420                    sql_bail!("conflicting or redundant options");
4421                }
4422                planned_attributes.login = Some(true);
4423            }
4424            RoleAttribute::NoLogin => {
4425                if planned_attributes.login == Some(true) {
4426                    sql_bail!("conflicting or redundant options");
4427                }
4428                planned_attributes.login = Some(false);
4429            }
4430        }
4431    }
4432    if planned_attributes.inherit == Some(false) {
4433        bail_unsupported!("non inherit roles");
4434    }
4435
4436    Ok(planned_attributes)
4437}
4438
4439#[derive(Debug)]
4440pub enum PlannedRoleVariable {
4441    Set { name: String, value: VariableValue },
4442    Reset { name: String },
4443}
4444
4445impl PlannedRoleVariable {
4446    pub fn name(&self) -> &str {
4447        match self {
4448            PlannedRoleVariable::Set { name, .. } => name,
4449            PlannedRoleVariable::Reset { name } => name,
4450        }
4451    }
4452}
4453
4454fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4455    let plan = match variable {
4456        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4457            name: name.to_string(),
4458            value: scl::plan_set_variable_to(value)?,
4459        },
4460        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4461            name: name.to_string(),
4462        },
4463    };
4464    Ok(plan)
4465}
4466
4467pub fn describe_create_role(
4468    _: &StatementContext,
4469    _: CreateRoleStatement,
4470) -> Result<StatementDesc, PlanError> {
4471    Ok(StatementDesc::new(None))
4472}
4473
4474pub fn plan_create_role(
4475    scx: &StatementContext,
4476    CreateRoleStatement { name, options }: CreateRoleStatement,
4477) -> Result<Plan, PlanError> {
4478    let attributes = plan_role_attributes(options, scx)?;
4479    Ok(Plan::CreateRole(CreateRolePlan {
4480        name: normalize::ident(name),
4481        attributes: attributes.into(),
4482    }))
4483}
4484
4485pub fn plan_create_network_policy(
4486    ctx: &StatementContext,
4487    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4488) -> Result<Plan, PlanError> {
4489    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4490    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4491
4492    let Some(rule_defs) = policy_options.rules else {
4493        sql_bail!("RULES must be specified when creating network policies.");
4494    };
4495
4496    let mut rules = vec![];
4497    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4498        let NetworkPolicyRuleOptionExtracted {
4499            seen: _,
4500            direction,
4501            action,
4502            address,
4503        } = options.try_into()?;
4504        let (direction, action, address) = match (direction, action, address) {
4505            (Some(direction), Some(action), Some(address)) => (
4506                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4507                NetworkPolicyRuleAction::try_from(action.as_str())?,
4508                PolicyAddress::try_from(address.as_str())?,
4509            ),
4510            (_, _, _) => {
4511                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4512            }
4513        };
4514        rules.push(NetworkPolicyRule {
4515            name: normalize::ident(name),
4516            direction,
4517            action,
4518            address,
4519        });
4520    }
4521
4522    if rules.len()
4523        > ctx
4524            .catalog
4525            .system_vars()
4526            .max_rules_per_network_policy()
4527            .try_into()?
4528    {
4529        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4530    }
4531
4532    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4533        name: normalize::ident(name),
4534        rules,
4535    }))
4536}
4537
4538pub fn plan_alter_network_policy(
4539    ctx: &StatementContext,
4540    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4541) -> Result<Plan, PlanError> {
4542    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4543
4544    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4545    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4546
4547    let Some(rule_defs) = policy_options.rules else {
4548        sql_bail!("RULES must be specified when creating network policies.");
4549    };
4550
4551    let mut rules = vec![];
4552    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4553        let NetworkPolicyRuleOptionExtracted {
4554            seen: _,
4555            direction,
4556            action,
4557            address,
4558        } = options.try_into()?;
4559
4560        let (direction, action, address) = match (direction, action, address) {
4561            (Some(direction), Some(action), Some(address)) => (
4562                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4563                NetworkPolicyRuleAction::try_from(action.as_str())?,
4564                PolicyAddress::try_from(address.as_str())?,
4565            ),
4566            (_, _, _) => {
4567                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4568            }
4569        };
4570        rules.push(NetworkPolicyRule {
4571            name: normalize::ident(name),
4572            direction,
4573            action,
4574            address,
4575        });
4576    }
4577    if rules.len()
4578        > ctx
4579            .catalog
4580            .system_vars()
4581            .max_rules_per_network_policy()
4582            .try_into()?
4583    {
4584        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4585    }
4586
4587    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4588        id: policy.id(),
4589        name: normalize::ident(name),
4590        rules,
4591    }))
4592}
4593
4594pub fn describe_create_cluster(
4595    _: &StatementContext,
4596    _: CreateClusterStatement<Aug>,
4597) -> Result<StatementDesc, PlanError> {
4598    Ok(StatementDesc::new(None))
4599}
4600
4601// WARNING:
4602// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4603// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4604// that option stays the same. If you were to give a default value here, then not giving that option
4605// to ALTER CLUSTER would always reset the value of that option to the default.
4606generate_extracted_config!(
4607    ClusterOption,
4608    (AvailabilityZones, Vec<String>),
4609    (Disk, bool),
4610    (IntrospectionDebugging, bool),
4611    (IntrospectionInterval, OptionalDuration),
4612    (Managed, bool),
4613    (Replicas, Vec<ReplicaDefinition<Aug>>),
4614    (ReplicationFactor, u32),
4615    (Size, String),
4616    (Schedule, ClusterScheduleOptionValue),
4617    (WorkloadClass, OptionalString)
4618);
4619
4620generate_extracted_config!(
4621    NetworkPolicyOption,
4622    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4623);
4624
4625generate_extracted_config!(
4626    NetworkPolicyRuleOption,
4627    (Direction, String),
4628    (Action, String),
4629    (Address, String)
4630);
4631
4632generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4633
4634generate_extracted_config!(
4635    ClusterAlterUntilReadyOption,
4636    (Timeout, Duration),
4637    (OnTimeout, String)
4638);
4639
4640generate_extracted_config!(
4641    ClusterFeature,
4642    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4643    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4644    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4645    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4646    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4647    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4648    (
4649        EnableProjectionPushdownAfterRelationCse,
4650        Option<bool>,
4651        Default(None)
4652    )
4653);
4654
4655/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4656///
4657/// The reverse of [`unplan_create_cluster`].
4658pub fn plan_create_cluster(
4659    scx: &StatementContext,
4660    stmt: CreateClusterStatement<Aug>,
4661) -> Result<Plan, PlanError> {
4662    let plan = plan_create_cluster_inner(scx, stmt)?;
4663
4664    // Roundtrip through unplan and make sure that we end up with the same plan.
4665    if let CreateClusterVariant::Managed(_) = &plan.variant {
4666        let stmt = unplan_create_cluster(scx, plan.clone())
4667            .map_err(|e| PlanError::Replan(e.to_string()))?;
4668        let create_sql = stmt.to_ast_string_stable();
4669        let stmt = parse::parse(&create_sql)
4670            .map_err(|e| PlanError::Replan(e.to_string()))?
4671            .into_element()
4672            .ast;
4673        let (stmt, _resolved_ids) =
4674            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4675        let stmt = match stmt {
4676            Statement::CreateCluster(stmt) => stmt,
4677            stmt => {
4678                return Err(PlanError::Replan(format!(
4679                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4680                )));
4681            }
4682        };
4683        let replan =
4684            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4685        if plan != replan {
4686            return Err(PlanError::Replan(format!(
4687                "replan does not match: plan={plan:?}, replan={replan:?}"
4688            )));
4689        }
4690    }
4691
4692    Ok(Plan::CreateCluster(plan))
4693}
4694
4695pub fn plan_create_cluster_inner(
4696    scx: &StatementContext,
4697    CreateClusterStatement {
4698        name,
4699        options,
4700        features,
4701    }: CreateClusterStatement<Aug>,
4702) -> Result<CreateClusterPlan, PlanError> {
4703    let ClusterOptionExtracted {
4704        availability_zones,
4705        introspection_debugging,
4706        introspection_interval,
4707        managed,
4708        replicas,
4709        replication_factor,
4710        seen: _,
4711        size,
4712        disk,
4713        schedule,
4714        workload_class,
4715    }: ClusterOptionExtracted = options.try_into()?;
4716
4717    let managed = managed.unwrap_or_else(|| replicas.is_none());
4718
4719    if !scx.catalog.active_role_id().is_system() {
4720        if !features.is_empty() {
4721            sql_bail!("FEATURES not supported for non-system users");
4722        }
4723        if workload_class.is_some() {
4724            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4725        }
4726    }
4727
4728    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4729    let workload_class = workload_class.and_then(|v| v.0);
4730
4731    if managed {
4732        if replicas.is_some() {
4733            sql_bail!("REPLICAS not supported for managed clusters");
4734        }
4735        let Some(size) = size else {
4736            sql_bail!("SIZE must be specified for managed clusters");
4737        };
4738
4739        if disk.is_some() {
4740            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4741            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4742            // we'll be able to remove the `DISK` option entirely.
4743            if scx.catalog.is_cluster_size_cc(&size) {
4744                sql_bail!(
4745                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4746                );
4747            }
4748
4749            scx.catalog
4750                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4751        }
4752
4753        let compute = plan_compute_replica_config(
4754            introspection_interval,
4755            introspection_debugging.unwrap_or(false),
4756        )?;
4757
4758        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4759            replication_factor.unwrap_or_else(|| {
4760                scx.catalog
4761                    .system_vars()
4762                    .default_cluster_replication_factor()
4763            })
4764        } else {
4765            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4766            if replication_factor.is_some() {
4767                sql_bail!(
4768                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4769                );
4770            }
4771            // If we have a non-trivial schedule, then let's not have any replicas initially,
4772            // to avoid quickly going back and forth if the schedule doesn't want a replica
4773            // initially.
4774            0
4775        };
4776        let availability_zones = availability_zones.unwrap_or_default();
4777
4778        if !availability_zones.is_empty() {
4779            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4780        }
4781
4782        // Plan OptimizerFeatureOverrides.
4783        let ClusterFeatureExtracted {
4784            reoptimize_imported_views,
4785            enable_eager_delta_joins,
4786            enable_new_outer_join_lowering,
4787            enable_variadic_left_join_lowering,
4788            enable_letrec_fixpoint_analysis,
4789            enable_join_prioritize_arranged,
4790            enable_projection_pushdown_after_relation_cse,
4791            seen: _,
4792        } = ClusterFeatureExtracted::try_from(features)?;
4793        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4794            reoptimize_imported_views,
4795            enable_eager_delta_joins,
4796            enable_new_outer_join_lowering,
4797            enable_variadic_left_join_lowering,
4798            enable_letrec_fixpoint_analysis,
4799            enable_join_prioritize_arranged,
4800            enable_projection_pushdown_after_relation_cse,
4801            ..Default::default()
4802        };
4803
4804        let schedule = plan_cluster_schedule(schedule)?;
4805
4806        Ok(CreateClusterPlan {
4807            name: normalize::ident(name),
4808            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4809                replication_factor,
4810                size,
4811                availability_zones,
4812                compute,
4813                optimizer_feature_overrides,
4814                schedule,
4815            }),
4816            workload_class,
4817        })
4818    } else {
4819        let Some(replica_defs) = replicas else {
4820            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4821        };
4822        if availability_zones.is_some() {
4823            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4824        }
4825        if replication_factor.is_some() {
4826            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4827        }
4828        if introspection_debugging.is_some() {
4829            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4830        }
4831        if introspection_interval.is_some() {
4832            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4833        }
4834        if size.is_some() {
4835            sql_bail!("SIZE not supported for unmanaged clusters");
4836        }
4837        if disk.is_some() {
4838            sql_bail!("DISK not supported for unmanaged clusters");
4839        }
4840        if !features.is_empty() {
4841            sql_bail!("FEATURES not supported for unmanaged clusters");
4842        }
4843        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4844            sql_bail!(
4845                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4846            );
4847        }
4848
4849        let mut replicas = vec![];
4850        for ReplicaDefinition { name, options } in replica_defs {
4851            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4852        }
4853
4854        Ok(CreateClusterPlan {
4855            name: normalize::ident(name),
4856            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4857            workload_class,
4858        })
4859    }
4860}
4861
4862/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4863///
4864/// The reverse of [`plan_create_cluster`].
4865pub fn unplan_create_cluster(
4866    scx: &StatementContext,
4867    CreateClusterPlan {
4868        name,
4869        variant,
4870        workload_class,
4871    }: CreateClusterPlan,
4872) -> Result<CreateClusterStatement<Aug>, PlanError> {
4873    match variant {
4874        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4875            replication_factor,
4876            size,
4877            availability_zones,
4878            compute,
4879            optimizer_feature_overrides,
4880            schedule,
4881        }) => {
4882            let schedule = unplan_cluster_schedule(schedule);
4883            let OptimizerFeatureOverrides {
4884                enable_guard_subquery_tablefunc: _,
4885                enable_consolidate_after_union_negate: _,
4886                enable_reduce_mfp_fusion: _,
4887                enable_cardinality_estimates: _,
4888                persist_fast_path_limit: _,
4889                reoptimize_imported_views,
4890                enable_eager_delta_joins,
4891                enable_new_outer_join_lowering,
4892                enable_variadic_left_join_lowering,
4893                enable_letrec_fixpoint_analysis,
4894                enable_reduce_reduction: _,
4895                enable_join_prioritize_arranged,
4896                enable_projection_pushdown_after_relation_cse,
4897                enable_less_reduce_in_eqprop: _,
4898                enable_dequadratic_eqprop_map: _,
4899                enable_eq_classes_withholding_errors: _,
4900                enable_fast_path_plan_insights: _,
4901                enable_repr_typecheck: _,
4902            } = optimizer_feature_overrides;
4903            // The ones from above that don't occur below are not wired up to cluster features.
4904            let features_extracted = ClusterFeatureExtracted {
4905                // Seen is ignored when unplanning.
4906                seen: Default::default(),
4907                reoptimize_imported_views,
4908                enable_eager_delta_joins,
4909                enable_new_outer_join_lowering,
4910                enable_variadic_left_join_lowering,
4911                enable_letrec_fixpoint_analysis,
4912                enable_join_prioritize_arranged,
4913                enable_projection_pushdown_after_relation_cse,
4914            };
4915            let features = features_extracted.into_values(scx.catalog);
4916            let availability_zones = if availability_zones.is_empty() {
4917                None
4918            } else {
4919                Some(availability_zones)
4920            };
4921            let (introspection_interval, introspection_debugging) =
4922                unplan_compute_replica_config(compute);
4923            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4924            // always 1 or less.
4925            let replication_factor = match &schedule {
4926                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4927                ClusterScheduleOptionValue::Refresh { .. } => {
4928                    assert!(
4929                        replication_factor <= 1,
4930                        "replication factor, {replication_factor:?}, must be <= 1"
4931                    );
4932                    None
4933                }
4934            };
4935            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4936            let options_extracted = ClusterOptionExtracted {
4937                // Seen is ignored when unplanning.
4938                seen: Default::default(),
4939                availability_zones,
4940                disk: None,
4941                introspection_debugging: Some(introspection_debugging),
4942                introspection_interval,
4943                managed: Some(true),
4944                replicas: None,
4945                replication_factor,
4946                size: Some(size),
4947                schedule: Some(schedule),
4948                workload_class,
4949            };
4950            let options = options_extracted.into_values(scx.catalog);
4951            let name = Ident::new_unchecked(name);
4952            Ok(CreateClusterStatement {
4953                name,
4954                options,
4955                features,
4956            })
4957        }
4958        CreateClusterVariant::Unmanaged(_) => {
4959            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4960        }
4961    }
4962}
4963
4964generate_extracted_config!(
4965    ReplicaOption,
4966    (AvailabilityZone, String),
4967    (BilledAs, String),
4968    (ComputeAddresses, Vec<String>),
4969    (ComputectlAddresses, Vec<String>),
4970    (Disk, bool),
4971    (Internal, bool, Default(false)),
4972    (IntrospectionDebugging, bool, Default(false)),
4973    (IntrospectionInterval, OptionalDuration),
4974    (Size, String),
4975    (StorageAddresses, Vec<String>),
4976    (StoragectlAddresses, Vec<String>),
4977    (Workers, u16)
4978);
4979
4980fn plan_replica_config(
4981    scx: &StatementContext,
4982    options: Vec<ReplicaOption<Aug>>,
4983) -> Result<ReplicaConfig, PlanError> {
4984    let ReplicaOptionExtracted {
4985        availability_zone,
4986        billed_as,
4987        computectl_addresses,
4988        disk,
4989        internal,
4990        introspection_debugging,
4991        introspection_interval,
4992        size,
4993        storagectl_addresses,
4994        ..
4995    }: ReplicaOptionExtracted = options.try_into()?;
4996
4997    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4998
4999    match (
5000        size,
5001        availability_zone,
5002        billed_as,
5003        storagectl_addresses,
5004        computectl_addresses,
5005    ) {
5006        // Common cases we expect end users to hit.
5007        (None, _, None, None, None) => {
5008            // We don't mention the unmanaged options in the error message
5009            // because they are only available in unsafe mode.
5010            sql_bail!("SIZE option must be specified");
5011        }
5012        (Some(size), availability_zone, billed_as, None, None) => {
5013            if disk.is_some() {
5014                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5015                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5016                // we'll be able to remove the `DISK` option entirely.
5017                if scx.catalog.is_cluster_size_cc(&size) {
5018                    sql_bail!(
5019                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5020                    );
5021                }
5022
5023                scx.catalog
5024                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5025            }
5026
5027            Ok(ReplicaConfig::Orchestrated {
5028                size,
5029                availability_zone,
5030                compute,
5031                billed_as,
5032                internal,
5033            })
5034        }
5035
5036        (None, None, None, storagectl_addresses, computectl_addresses) => {
5037            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5038
5039            // When manually testing Materialize in unsafe mode, it's easy to
5040            // accidentally omit one of these options, so we try to produce
5041            // helpful error messages.
5042            let Some(storagectl_addrs) = storagectl_addresses else {
5043                sql_bail!("missing STORAGECTL ADDRESSES option");
5044            };
5045            let Some(computectl_addrs) = computectl_addresses else {
5046                sql_bail!("missing COMPUTECTL ADDRESSES option");
5047            };
5048
5049            if storagectl_addrs.len() != computectl_addrs.len() {
5050                sql_bail!(
5051                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5052                );
5053            }
5054
5055            if disk.is_some() {
5056                sql_bail!("DISK can't be specified for unorchestrated clusters");
5057            }
5058
5059            Ok(ReplicaConfig::Unorchestrated {
5060                storagectl_addrs,
5061                computectl_addrs,
5062                compute,
5063            })
5064        }
5065        _ => {
5066            // We don't bother trying to produce a more helpful error message
5067            // here because no user is likely to hit this path.
5068            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5069        }
5070    }
5071}
5072
5073/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5074///
5075/// The reverse of [`unplan_compute_replica_config`].
5076fn plan_compute_replica_config(
5077    introspection_interval: Option<OptionalDuration>,
5078    introspection_debugging: bool,
5079) -> Result<ComputeReplicaConfig, PlanError> {
5080    let introspection_interval = introspection_interval
5081        .map(|OptionalDuration(i)| i)
5082        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5083    let introspection = match introspection_interval {
5084        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5085            interval,
5086            debugging: introspection_debugging,
5087        }),
5088        None if introspection_debugging => {
5089            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5090        }
5091        None => None,
5092    };
5093    let compute = ComputeReplicaConfig { introspection };
5094    Ok(compute)
5095}
5096
5097/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5098///
5099/// The reverse of [`plan_compute_replica_config`].
5100fn unplan_compute_replica_config(
5101    compute_replica_config: ComputeReplicaConfig,
5102) -> (Option<OptionalDuration>, bool) {
5103    match compute_replica_config.introspection {
5104        Some(ComputeReplicaIntrospectionConfig {
5105            debugging,
5106            interval,
5107        }) => (Some(OptionalDuration(Some(interval))), debugging),
5108        None => (Some(OptionalDuration(None)), false),
5109    }
5110}
5111
5112/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5113///
5114/// The reverse of [`unplan_cluster_schedule`].
5115fn plan_cluster_schedule(
5116    schedule: ClusterScheduleOptionValue,
5117) -> Result<ClusterSchedule, PlanError> {
5118    Ok(match schedule {
5119        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5120        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5121        ClusterScheduleOptionValue::Refresh {
5122            hydration_time_estimate: None,
5123        } => ClusterSchedule::Refresh {
5124            hydration_time_estimate: Duration::from_millis(0),
5125        },
5126        // Otherwise we convert the `IntervalValue` to a `Duration`.
5127        ClusterScheduleOptionValue::Refresh {
5128            hydration_time_estimate: Some(interval_value),
5129        } => {
5130            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5131            if interval.as_microseconds() < 0 {
5132                sql_bail!(
5133                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5134                    interval
5135                );
5136            }
5137            if interval.months != 0 {
5138                // This limitation is because we want this interval to be cleanly convertable
5139                // to a unix epoch timestamp difference. When the interval involves months, then
5140                // this is not true anymore, because months have variable lengths.
5141                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5142            }
5143            let duration = interval.duration()?;
5144            if u64::try_from(duration.as_millis()).is_err()
5145                || Interval::from_duration(&duration).is_err()
5146            {
5147                sql_bail!("HYDRATION TIME ESTIMATE too large");
5148            }
5149            ClusterSchedule::Refresh {
5150                hydration_time_estimate: duration,
5151            }
5152        }
5153    })
5154}
5155
5156/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5157///
5158/// The reverse of [`plan_cluster_schedule`].
5159fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5160    match schedule {
5161        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5162        ClusterSchedule::Refresh {
5163            hydration_time_estimate,
5164        } => {
5165            let interval = Interval::from_duration(&hydration_time_estimate)
5166                .expect("planning ensured that this is convertible back to Interval");
5167            let interval_value = literal::unplan_interval(&interval);
5168            ClusterScheduleOptionValue::Refresh {
5169                hydration_time_estimate: Some(interval_value),
5170            }
5171        }
5172    }
5173}
5174
5175pub fn describe_create_cluster_replica(
5176    _: &StatementContext,
5177    _: CreateClusterReplicaStatement<Aug>,
5178) -> Result<StatementDesc, PlanError> {
5179    Ok(StatementDesc::new(None))
5180}
5181
5182pub fn plan_create_cluster_replica(
5183    scx: &StatementContext,
5184    CreateClusterReplicaStatement {
5185        definition: ReplicaDefinition { name, options },
5186        of_cluster,
5187    }: CreateClusterReplicaStatement<Aug>,
5188) -> Result<Plan, PlanError> {
5189    let cluster = scx
5190        .catalog
5191        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5192    let current_replica_count = cluster.replica_ids().iter().count();
5193    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5194        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5195        return Err(PlanError::CreateReplicaFailStorageObjects {
5196            current_replica_count,
5197            internal_replica_count,
5198            hypothetical_replica_count: current_replica_count + 1,
5199        });
5200    }
5201
5202    let config = plan_replica_config(scx, options)?;
5203
5204    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5205        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5206            return Err(PlanError::MangedReplicaName(name.into_string()));
5207        }
5208    } else {
5209        ensure_cluster_is_not_managed(scx, cluster.id())?;
5210    }
5211
5212    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5213        name: normalize::ident(name),
5214        cluster_id: cluster.id(),
5215        config,
5216    }))
5217}
5218
5219pub fn describe_create_secret(
5220    _: &StatementContext,
5221    _: CreateSecretStatement<Aug>,
5222) -> Result<StatementDesc, PlanError> {
5223    Ok(StatementDesc::new(None))
5224}
5225
5226pub fn plan_create_secret(
5227    scx: &StatementContext,
5228    stmt: CreateSecretStatement<Aug>,
5229) -> Result<Plan, PlanError> {
5230    let CreateSecretStatement {
5231        name,
5232        if_not_exists,
5233        value,
5234    } = &stmt;
5235
5236    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5237    let mut create_sql_statement = stmt.clone();
5238    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5239    let create_sql =
5240        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5241    let secret_as = query::plan_secret_as(scx, value.clone())?;
5242
5243    let secret = Secret {
5244        create_sql,
5245        secret_as,
5246    };
5247
5248    Ok(Plan::CreateSecret(CreateSecretPlan {
5249        name,
5250        secret,
5251        if_not_exists: *if_not_exists,
5252    }))
5253}
5254
5255pub fn describe_create_connection(
5256    _: &StatementContext,
5257    _: CreateConnectionStatement<Aug>,
5258) -> Result<StatementDesc, PlanError> {
5259    Ok(StatementDesc::new(None))
5260}
5261
5262generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5263
5264pub fn plan_create_connection(
5265    scx: &StatementContext,
5266    mut stmt: CreateConnectionStatement<Aug>,
5267) -> Result<Plan, PlanError> {
5268    let CreateConnectionStatement {
5269        name,
5270        connection_type,
5271        values,
5272        if_not_exists,
5273        with_options,
5274    } = stmt.clone();
5275    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5276    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5277    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5278
5279    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5280    if options.validate.is_some() {
5281        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5282    }
5283    let validate = match options.validate {
5284        Some(val) => val,
5285        None => {
5286            scx.catalog
5287                .system_vars()
5288                .enable_default_connection_validation()
5289                && details.to_connection().validate_by_default()
5290        }
5291    };
5292
5293    // Check for an object in the catalog with this same name
5294    let full_name = scx.catalog.resolve_full_name(&name);
5295    let partial_name = PartialItemName::from(full_name.clone());
5296    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5297        return Err(PlanError::ItemAlreadyExists {
5298            name: full_name.to_string(),
5299            item_type: item.item_type(),
5300        });
5301    }
5302
5303    // For SSH connections, overwrite the public key options based on the
5304    // connection details, in case we generated new keys during planning.
5305    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5306        stmt.values.retain(|v| {
5307            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5308        });
5309        stmt.values.push(ConnectionOption {
5310            name: ConnectionOptionName::PublicKey1,
5311            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5312        });
5313        stmt.values.push(ConnectionOption {
5314            name: ConnectionOptionName::PublicKey2,
5315            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5316        });
5317    }
5318    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5319
5320    let plan = CreateConnectionPlan {
5321        name,
5322        if_not_exists,
5323        connection: crate::plan::Connection {
5324            create_sql,
5325            details,
5326        },
5327        validate,
5328    };
5329    Ok(Plan::CreateConnection(plan))
5330}
5331
5332fn plan_drop_database(
5333    scx: &StatementContext,
5334    if_exists: bool,
5335    name: &UnresolvedDatabaseName,
5336    cascade: bool,
5337) -> Result<Option<DatabaseId>, PlanError> {
5338    Ok(match resolve_database(scx, name, if_exists)? {
5339        Some(database) => {
5340            if !cascade && database.has_schemas() {
5341                sql_bail!(
5342                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5343                    name,
5344                );
5345            }
5346            Some(database.id())
5347        }
5348        None => None,
5349    })
5350}
5351
5352pub fn describe_drop_objects(
5353    _: &StatementContext,
5354    _: DropObjectsStatement,
5355) -> Result<StatementDesc, PlanError> {
5356    Ok(StatementDesc::new(None))
5357}
5358
5359pub fn plan_drop_objects(
5360    scx: &mut StatementContext,
5361    DropObjectsStatement {
5362        object_type,
5363        if_exists,
5364        names,
5365        cascade,
5366    }: DropObjectsStatement,
5367) -> Result<Plan, PlanError> {
5368    assert_ne!(
5369        object_type,
5370        mz_sql_parser::ast::ObjectType::Func,
5371        "rejected in parser"
5372    );
5373    let object_type = object_type.into();
5374
5375    let mut referenced_ids = Vec::new();
5376    for name in names {
5377        let id = match &name {
5378            UnresolvedObjectName::Cluster(name) => {
5379                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5380            }
5381            UnresolvedObjectName::ClusterReplica(name) => {
5382                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5383            }
5384            UnresolvedObjectName::Database(name) => {
5385                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5386            }
5387            UnresolvedObjectName::Schema(name) => {
5388                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5389            }
5390            UnresolvedObjectName::Role(name) => {
5391                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5392            }
5393            UnresolvedObjectName::Item(name) => {
5394                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5395                    .map(ObjectId::Item)
5396            }
5397            UnresolvedObjectName::NetworkPolicy(name) => {
5398                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5399            }
5400        };
5401        match id {
5402            Some(id) => referenced_ids.push(id),
5403            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5404                name: name.to_ast_string_simple(),
5405                object_type,
5406            }),
5407        }
5408    }
5409    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5410
5411    Ok(Plan::DropObjects(DropObjectsPlan {
5412        referenced_ids,
5413        drop_ids,
5414        object_type,
5415    }))
5416}
5417
5418fn plan_drop_schema(
5419    scx: &StatementContext,
5420    if_exists: bool,
5421    name: &UnresolvedSchemaName,
5422    cascade: bool,
5423) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5424    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5425        Some((database_spec, schema_spec)) => {
5426            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5427                sql_bail!(
5428                    "cannot drop schema {name} because it is required by the database system",
5429                );
5430            }
5431            if let SchemaSpecifier::Temporary = schema_spec {
5432                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5433            }
5434            let schema = scx.get_schema(&database_spec, &schema_spec);
5435            if !cascade && schema.has_items() {
5436                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5437                sql_bail!(
5438                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5439                    full_schema_name
5440                );
5441            }
5442            Some((database_spec, schema_spec))
5443        }
5444        None => None,
5445    })
5446}
5447
5448fn plan_drop_role(
5449    scx: &StatementContext,
5450    if_exists: bool,
5451    name: &Ident,
5452) -> Result<Option<RoleId>, PlanError> {
5453    match scx.catalog.resolve_role(name.as_str()) {
5454        Ok(role) => {
5455            let id = role.id();
5456            if &id == scx.catalog.active_role_id() {
5457                sql_bail!("current role cannot be dropped");
5458            }
5459            for role in scx.catalog.get_roles() {
5460                for (member_id, grantor_id) in role.membership() {
5461                    if &id == grantor_id {
5462                        let member_role = scx.catalog.get_role(member_id);
5463                        sql_bail!(
5464                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5465                            name.as_str(),
5466                            role.name(),
5467                            member_role.name()
5468                        );
5469                    }
5470                }
5471            }
5472            Ok(Some(role.id()))
5473        }
5474        Err(_) if if_exists => Ok(None),
5475        Err(e) => Err(e.into()),
5476    }
5477}
5478
5479fn plan_drop_cluster(
5480    scx: &StatementContext,
5481    if_exists: bool,
5482    name: &Ident,
5483    cascade: bool,
5484) -> Result<Option<ClusterId>, PlanError> {
5485    Ok(match resolve_cluster(scx, name, if_exists)? {
5486        Some(cluster) => {
5487            if !cascade && !cluster.bound_objects().is_empty() {
5488                return Err(PlanError::DependentObjectsStillExist {
5489                    object_type: "cluster".to_string(),
5490                    object_name: cluster.name().to_string(),
5491                    dependents: Vec::new(),
5492                });
5493            }
5494            Some(cluster.id())
5495        }
5496        None => None,
5497    })
5498}
5499
5500fn plan_drop_network_policy(
5501    scx: &StatementContext,
5502    if_exists: bool,
5503    name: &Ident,
5504) -> Result<Option<NetworkPolicyId>, PlanError> {
5505    match scx.catalog.resolve_network_policy(name.as_str()) {
5506        Ok(policy) => {
5507            // TODO(network_policy): When we support role based network policies, check if any role
5508            // currently has the specified policy set.
5509            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5510                Err(PlanError::NetworkPolicyInUse)
5511            } else {
5512                Ok(Some(policy.id()))
5513            }
5514        }
5515        Err(_) if if_exists => Ok(None),
5516        Err(e) => Err(e.into()),
5517    }
5518}
5519
5520/// Returns `true` if the cluster has any object that requires a single replica.
5521/// Returns `false` if the cluster has no objects.
5522fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5523    // If this feature is enabled then all objects support multiple-replicas
5524    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5525        false
5526    } else {
5527        // Othewise we check for the existence of sources or sinks
5528        cluster.bound_objects().iter().any(|id| {
5529            let item = scx.catalog.get_item(id);
5530            matches!(
5531                item.item_type(),
5532                CatalogItemType::Sink | CatalogItemType::Source
5533            )
5534        })
5535    }
5536}
5537
5538fn plan_drop_cluster_replica(
5539    scx: &StatementContext,
5540    if_exists: bool,
5541    name: &QualifiedReplica,
5542) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5543    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5544    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5545}
5546
5547/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5548fn plan_drop_item(
5549    scx: &StatementContext,
5550    object_type: ObjectType,
5551    if_exists: bool,
5552    name: UnresolvedItemName,
5553    cascade: bool,
5554) -> Result<Option<CatalogItemId>, PlanError> {
5555    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5556        Ok(r) => r,
5557        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5558        Err(PlanError::MismatchedObjectType {
5559            name,
5560            is_type: ObjectType::MaterializedView,
5561            expected_type: ObjectType::View,
5562        }) => {
5563            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5564        }
5565        e => e?,
5566    };
5567
5568    Ok(match resolved {
5569        Some(catalog_item) => {
5570            if catalog_item.id().is_system() {
5571                sql_bail!(
5572                    "cannot drop {} {} because it is required by the database system",
5573                    catalog_item.item_type(),
5574                    scx.catalog.minimal_qualification(catalog_item.name()),
5575                );
5576            }
5577
5578            if !cascade {
5579                for id in catalog_item.used_by() {
5580                    let dep = scx.catalog.get_item(id);
5581                    if dependency_prevents_drop(object_type, dep) {
5582                        return Err(PlanError::DependentObjectsStillExist {
5583                            object_type: catalog_item.item_type().to_string(),
5584                            object_name: scx
5585                                .catalog
5586                                .minimal_qualification(catalog_item.name())
5587                                .to_string(),
5588                            dependents: vec![(
5589                                dep.item_type().to_string(),
5590                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5591                            )],
5592                        });
5593                    }
5594                }
5595                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5596                //  relies on entry. Unfortunately, we don't have that information readily available.
5597            }
5598            Some(catalog_item.id())
5599        }
5600        None => None,
5601    })
5602}
5603
5604/// Does the dependency `dep` prevent a drop of a non-cascade query?
5605fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5606    match object_type {
5607        ObjectType::Type => true,
5608        _ => match dep.item_type() {
5609            CatalogItemType::Func
5610            | CatalogItemType::Table
5611            | CatalogItemType::Source
5612            | CatalogItemType::View
5613            | CatalogItemType::MaterializedView
5614            | CatalogItemType::Sink
5615            | CatalogItemType::Type
5616            | CatalogItemType::Secret
5617            | CatalogItemType::Connection
5618            | CatalogItemType::ContinualTask => true,
5619            CatalogItemType::Index => false,
5620        },
5621    }
5622}
5623
5624pub fn describe_alter_index_options(
5625    _: &StatementContext,
5626    _: AlterIndexStatement<Aug>,
5627) -> Result<StatementDesc, PlanError> {
5628    Ok(StatementDesc::new(None))
5629}
5630
5631pub fn describe_drop_owned(
5632    _: &StatementContext,
5633    _: DropOwnedStatement<Aug>,
5634) -> Result<StatementDesc, PlanError> {
5635    Ok(StatementDesc::new(None))
5636}
5637
5638pub fn plan_drop_owned(
5639    scx: &StatementContext,
5640    drop: DropOwnedStatement<Aug>,
5641) -> Result<Plan, PlanError> {
5642    let cascade = drop.cascade();
5643    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5644    let mut drop_ids = Vec::new();
5645    let mut privilege_revokes = Vec::new();
5646    let mut default_privilege_revokes = Vec::new();
5647
5648    fn update_privilege_revokes(
5649        object_id: SystemObjectId,
5650        privileges: &PrivilegeMap,
5651        role_ids: &BTreeSet<RoleId>,
5652        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5653    ) {
5654        privilege_revokes.extend(iter::zip(
5655            iter::repeat(object_id),
5656            privileges
5657                .all_values()
5658                .filter(|privilege| role_ids.contains(&privilege.grantee))
5659                .cloned(),
5660        ));
5661    }
5662
5663    // Replicas
5664    for replica in scx.catalog.get_cluster_replicas() {
5665        if role_ids.contains(&replica.owner_id()) {
5666            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5667        }
5668    }
5669
5670    // Clusters
5671    for cluster in scx.catalog.get_clusters() {
5672        if role_ids.contains(&cluster.owner_id()) {
5673            // Note: CASCADE is not required for replicas.
5674            if !cascade {
5675                let non_owned_bound_objects: Vec<_> = cluster
5676                    .bound_objects()
5677                    .into_iter()
5678                    .map(|item_id| scx.catalog.get_item(item_id))
5679                    .filter(|item| !role_ids.contains(&item.owner_id()))
5680                    .collect();
5681                if !non_owned_bound_objects.is_empty() {
5682                    let names: Vec<_> = non_owned_bound_objects
5683                        .into_iter()
5684                        .map(|item| {
5685                            (
5686                                item.item_type().to_string(),
5687                                scx.catalog.resolve_full_name(item.name()).to_string(),
5688                            )
5689                        })
5690                        .collect();
5691                    return Err(PlanError::DependentObjectsStillExist {
5692                        object_type: "cluster".to_string(),
5693                        object_name: cluster.name().to_string(),
5694                        dependents: names,
5695                    });
5696                }
5697            }
5698            drop_ids.push(cluster.id().into());
5699        }
5700        update_privilege_revokes(
5701            SystemObjectId::Object(cluster.id().into()),
5702            cluster.privileges(),
5703            &role_ids,
5704            &mut privilege_revokes,
5705        );
5706    }
5707
5708    // Items
5709    for item in scx.catalog.get_items() {
5710        if role_ids.contains(&item.owner_id()) {
5711            if !cascade {
5712                // Checks if any items still depend on this one, returning an error if so.
5713                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5714                    let non_owned_dependencies: Vec<_> = used_by
5715                        .into_iter()
5716                        .map(|item_id| scx.catalog.get_item(item_id))
5717                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5718                        .filter(|item| !role_ids.contains(&item.owner_id()))
5719                        .collect();
5720                    if !non_owned_dependencies.is_empty() {
5721                        let names: Vec<_> = non_owned_dependencies
5722                            .into_iter()
5723                            .map(|item| {
5724                                let item_typ = item.item_type().to_string();
5725                                let item_name =
5726                                    scx.catalog.resolve_full_name(item.name()).to_string();
5727                                (item_typ, item_name)
5728                            })
5729                            .collect();
5730                        Err(PlanError::DependentObjectsStillExist {
5731                            object_type: item.item_type().to_string(),
5732                            object_name: scx
5733                                .catalog
5734                                .resolve_full_name(item.name())
5735                                .to_string()
5736                                .to_string(),
5737                            dependents: names,
5738                        })
5739                    } else {
5740                        Ok(())
5741                    }
5742                };
5743
5744                // When this item gets dropped it will also drop its progress source, so we need to
5745                // check the users of those.
5746                if let Some(id) = item.progress_id() {
5747                    let progress_item = scx.catalog.get_item(&id);
5748                    check_if_dependents_exist(progress_item.used_by())?;
5749                }
5750                check_if_dependents_exist(item.used_by())?;
5751            }
5752            drop_ids.push(item.id().into());
5753        }
5754        update_privilege_revokes(
5755            SystemObjectId::Object(item.id().into()),
5756            item.privileges(),
5757            &role_ids,
5758            &mut privilege_revokes,
5759        );
5760    }
5761
5762    // Schemas
5763    for schema in scx.catalog.get_schemas() {
5764        if !schema.id().is_temporary() {
5765            if role_ids.contains(&schema.owner_id()) {
5766                if !cascade {
5767                    let non_owned_dependencies: Vec<_> = schema
5768                        .item_ids()
5769                        .map(|item_id| scx.catalog.get_item(&item_id))
5770                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5771                        .filter(|item| !role_ids.contains(&item.owner_id()))
5772                        .collect();
5773                    if !non_owned_dependencies.is_empty() {
5774                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5775                        sql_bail!(
5776                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5777                            full_schema_name.to_string().quoted()
5778                        );
5779                    }
5780                }
5781                drop_ids.push((*schema.database(), *schema.id()).into())
5782            }
5783            update_privilege_revokes(
5784                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5785                schema.privileges(),
5786                &role_ids,
5787                &mut privilege_revokes,
5788            );
5789        }
5790    }
5791
5792    // Databases
5793    for database in scx.catalog.get_databases() {
5794        if role_ids.contains(&database.owner_id()) {
5795            if !cascade {
5796                let non_owned_schemas: Vec<_> = database
5797                    .schemas()
5798                    .into_iter()
5799                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5800                    .collect();
5801                if !non_owned_schemas.is_empty() {
5802                    sql_bail!(
5803                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5804                        database.name().quoted(),
5805                    );
5806                }
5807            }
5808            drop_ids.push(database.id().into());
5809        }
5810        update_privilege_revokes(
5811            SystemObjectId::Object(database.id().into()),
5812            database.privileges(),
5813            &role_ids,
5814            &mut privilege_revokes,
5815        );
5816    }
5817
5818    // System
5819    update_privilege_revokes(
5820        SystemObjectId::System,
5821        scx.catalog.get_system_privileges(),
5822        &role_ids,
5823        &mut privilege_revokes,
5824    );
5825
5826    for (default_privilege_object, default_privilege_acl_items) in
5827        scx.catalog.get_default_privileges()
5828    {
5829        for default_privilege_acl_item in default_privilege_acl_items {
5830            if role_ids.contains(&default_privilege_object.role_id)
5831                || role_ids.contains(&default_privilege_acl_item.grantee)
5832            {
5833                default_privilege_revokes.push((
5834                    default_privilege_object.clone(),
5835                    default_privilege_acl_item.clone(),
5836                ));
5837            }
5838        }
5839    }
5840
5841    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5842
5843    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5844    if !system_ids.is_empty() {
5845        let mut owners = system_ids
5846            .into_iter()
5847            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5848            .collect::<BTreeSet<_>>()
5849            .into_iter()
5850            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5851        sql_bail!(
5852            "cannot drop objects owned by role {} because they are required by the database system",
5853            owners.join(", "),
5854        );
5855    }
5856
5857    Ok(Plan::DropOwned(DropOwnedPlan {
5858        role_ids: role_ids.into_iter().collect(),
5859        drop_ids,
5860        privilege_revokes,
5861        default_privilege_revokes,
5862    }))
5863}
5864
5865fn plan_retain_history_option(
5866    scx: &StatementContext,
5867    retain_history: Option<OptionalDuration>,
5868) -> Result<Option<CompactionWindow>, PlanError> {
5869    if let Some(OptionalDuration(lcw)) = retain_history {
5870        Ok(Some(plan_retain_history(scx, lcw)?))
5871    } else {
5872        Ok(None)
5873    }
5874}
5875
5876// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5877// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5878// already converts the zero duration into `None`. This function must not be called in the `RESET
5879// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5880// `None`.
5881fn plan_retain_history(
5882    scx: &StatementContext,
5883    lcw: Option<Duration>,
5884) -> Result<CompactionWindow, PlanError> {
5885    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5886    match lcw {
5887        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5888        // disable compaction), and should never occur here. Furthermore, some things actually do
5889        // break when this is set to real zero:
5890        // https://github.com/MaterializeInc/database-issues/issues/3798.
5891        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5892            option_name: "RETAIN HISTORY".to_string(),
5893            err: Box::new(PlanError::Unstructured(
5894                "internal error: unexpectedly zero".to_string(),
5895            )),
5896        }),
5897        Some(duration) => {
5898            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5899            // should only be possible during testing).
5900            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5901                && scx
5902                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5903                    .is_err()
5904            {
5905                return Err(PlanError::RetainHistoryLow {
5906                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5907                });
5908            }
5909            Ok(duration.try_into()?)
5910        }
5911        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5912        // to be a bad choice, so prevent it.
5913        None => {
5914            if scx
5915                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5916                .is_err()
5917            {
5918                Err(PlanError::RetainHistoryRequired)
5919            } else {
5920                Ok(CompactionWindow::DisableCompaction)
5921            }
5922        }
5923    }
5924}
5925
5926generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5927
5928fn plan_index_options(
5929    scx: &StatementContext,
5930    with_opts: Vec<IndexOption<Aug>>,
5931) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5932    if !with_opts.is_empty() {
5933        // Index options are not durable.
5934        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5935    }
5936
5937    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5938
5939    let mut out = Vec::with_capacity(1);
5940    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5941        out.push(crate::plan::IndexOption::RetainHistory(cw));
5942    }
5943    Ok(out)
5944}
5945
5946generate_extracted_config!(
5947    TableOption,
5948    (PartitionBy, Vec<Ident>),
5949    (RetainHistory, OptionalDuration),
5950    (RedactedTest, String)
5951);
5952
5953fn plan_table_options(
5954    scx: &StatementContext,
5955    desc: &RelationDesc,
5956    with_opts: Vec<TableOption<Aug>>,
5957) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5958    let TableOptionExtracted {
5959        partition_by,
5960        retain_history,
5961        redacted_test,
5962        ..
5963    }: TableOptionExtracted = with_opts.try_into()?;
5964
5965    if let Some(partition_by) = partition_by {
5966        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5967        check_partition_by(desc, partition_by)?;
5968    }
5969
5970    if redacted_test.is_some() {
5971        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5972    }
5973
5974    let mut out = Vec::with_capacity(1);
5975    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5976        out.push(crate::plan::TableOption::RetainHistory(cw));
5977    }
5978    Ok(out)
5979}
5980
5981pub fn plan_alter_index_options(
5982    scx: &mut StatementContext,
5983    AlterIndexStatement {
5984        index_name,
5985        if_exists,
5986        action,
5987    }: AlterIndexStatement<Aug>,
5988) -> Result<Plan, PlanError> {
5989    let object_type = ObjectType::Index;
5990    match action {
5991        AlterIndexAction::ResetOptions(options) => {
5992            let mut options = options.into_iter();
5993            if let Some(opt) = options.next() {
5994                match opt {
5995                    IndexOptionName::RetainHistory => {
5996                        if options.next().is_some() {
5997                            sql_bail!("RETAIN HISTORY must be only option");
5998                        }
5999                        return alter_retain_history(
6000                            scx,
6001                            object_type,
6002                            if_exists,
6003                            UnresolvedObjectName::Item(index_name),
6004                            None,
6005                        );
6006                    }
6007                }
6008            }
6009            sql_bail!("expected option");
6010        }
6011        AlterIndexAction::SetOptions(options) => {
6012            let mut options = options.into_iter();
6013            if let Some(opt) = options.next() {
6014                match opt.name {
6015                    IndexOptionName::RetainHistory => {
6016                        if options.next().is_some() {
6017                            sql_bail!("RETAIN HISTORY must be only option");
6018                        }
6019                        return alter_retain_history(
6020                            scx,
6021                            object_type,
6022                            if_exists,
6023                            UnresolvedObjectName::Item(index_name),
6024                            opt.value,
6025                        );
6026                    }
6027                }
6028            }
6029            sql_bail!("expected option");
6030        }
6031    }
6032}
6033
6034pub fn describe_alter_cluster_set_options(
6035    _: &StatementContext,
6036    _: AlterClusterStatement<Aug>,
6037) -> Result<StatementDesc, PlanError> {
6038    Ok(StatementDesc::new(None))
6039}
6040
6041pub fn plan_alter_cluster(
6042    scx: &mut StatementContext,
6043    AlterClusterStatement {
6044        name,
6045        action,
6046        if_exists,
6047    }: AlterClusterStatement<Aug>,
6048) -> Result<Plan, PlanError> {
6049    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6050        Some(entry) => entry,
6051        None => {
6052            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6053                name: name.to_ast_string_simple(),
6054                object_type: ObjectType::Cluster,
6055            });
6056
6057            return Ok(Plan::AlterNoop(AlterNoopPlan {
6058                object_type: ObjectType::Cluster,
6059            }));
6060        }
6061    };
6062
6063    let mut options: PlanClusterOption = Default::default();
6064    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6065
6066    match action {
6067        AlterClusterAction::SetOptions {
6068            options: set_options,
6069            with_options,
6070        } => {
6071            let ClusterOptionExtracted {
6072                availability_zones,
6073                introspection_debugging,
6074                introspection_interval,
6075                managed,
6076                replicas: replica_defs,
6077                replication_factor,
6078                seen: _,
6079                size,
6080                disk,
6081                schedule,
6082                workload_class,
6083            }: ClusterOptionExtracted = set_options.try_into()?;
6084
6085            if !scx.catalog.active_role_id().is_system() {
6086                if workload_class.is_some() {
6087                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6088                }
6089            }
6090
6091            match managed.unwrap_or_else(|| cluster.is_managed()) {
6092                true => {
6093                    let alter_strategy_extracted =
6094                        ClusterAlterOptionExtracted::try_from(with_options)?;
6095                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6096
6097                    match alter_strategy {
6098                        AlterClusterPlanStrategy::None => {}
6099                        _ => {
6100                            scx.require_feature_flag(
6101                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6102                            )?;
6103                        }
6104                    }
6105
6106                    if replica_defs.is_some() {
6107                        sql_bail!("REPLICAS not supported for managed clusters");
6108                    }
6109                    if schedule.is_some()
6110                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6111                    {
6112                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6113                    }
6114
6115                    if let Some(replication_factor) = replication_factor {
6116                        if schedule.is_some()
6117                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6118                        {
6119                            sql_bail!(
6120                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6121                            );
6122                        }
6123                        if let Some(current_schedule) = cluster.schedule() {
6124                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6125                                sql_bail!(
6126                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6127                                );
6128                            }
6129                        }
6130
6131                        let internal_replica_count =
6132                            cluster.replicas().iter().filter(|r| r.internal()).count();
6133                        let hypothetical_replica_count =
6134                            internal_replica_count + usize::cast_from(replication_factor);
6135
6136                        // Total number of replicas running is internal replicas
6137                        // + replication factor.
6138                        if contains_single_replica_objects(scx, cluster)
6139                            && hypothetical_replica_count > 1
6140                        {
6141                            return Err(PlanError::CreateReplicaFailStorageObjects {
6142                                current_replica_count: cluster.replica_ids().iter().count(),
6143                                internal_replica_count,
6144                                hypothetical_replica_count,
6145                            });
6146                        }
6147                    } else if alter_strategy.is_some() {
6148                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6149                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6150                        // just fail.
6151                        let internal_replica_count =
6152                            cluster.replicas().iter().filter(|r| r.internal()).count();
6153                        let hypothetical_replica_count = internal_replica_count * 2;
6154                        if contains_single_replica_objects(scx, cluster) {
6155                            return Err(PlanError::CreateReplicaFailStorageObjects {
6156                                current_replica_count: cluster.replica_ids().iter().count(),
6157                                internal_replica_count,
6158                                hypothetical_replica_count,
6159                            });
6160                        }
6161                    }
6162                }
6163                false => {
6164                    if !alter_strategy.is_none() {
6165                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6166                    }
6167                    if availability_zones.is_some() {
6168                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6169                    }
6170                    if replication_factor.is_some() {
6171                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6172                    }
6173                    if introspection_debugging.is_some() {
6174                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6175                    }
6176                    if introspection_interval.is_some() {
6177                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6178                    }
6179                    if size.is_some() {
6180                        sql_bail!("SIZE not supported for unmanaged clusters");
6181                    }
6182                    if disk.is_some() {
6183                        sql_bail!("DISK not supported for unmanaged clusters");
6184                    }
6185                    if schedule.is_some()
6186                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6187                    {
6188                        sql_bail!(
6189                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6190                        );
6191                    }
6192                    if let Some(current_schedule) = cluster.schedule() {
6193                        if !matches!(current_schedule, ClusterSchedule::Manual)
6194                            && schedule.is_none()
6195                        {
6196                            sql_bail!(
6197                                "when switching a cluster to unmanaged, if the managed \
6198                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6199                                explicitly set the SCHEDULE to MANUAL"
6200                            );
6201                        }
6202                    }
6203                }
6204            }
6205
6206            let mut replicas = vec![];
6207            for ReplicaDefinition { name, options } in
6208                replica_defs.into_iter().flat_map(Vec::into_iter)
6209            {
6210                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6211            }
6212
6213            if let Some(managed) = managed {
6214                options.managed = AlterOptionParameter::Set(managed);
6215            }
6216            if let Some(replication_factor) = replication_factor {
6217                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6218            }
6219            if let Some(size) = &size {
6220                options.size = AlterOptionParameter::Set(size.clone());
6221            }
6222            if let Some(availability_zones) = availability_zones {
6223                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6224            }
6225            if let Some(introspection_debugging) = introspection_debugging {
6226                options.introspection_debugging =
6227                    AlterOptionParameter::Set(introspection_debugging);
6228            }
6229            if let Some(introspection_interval) = introspection_interval {
6230                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6231            }
6232            if disk.is_some() {
6233                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6234                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6235                // we'll be able to remove the `DISK` option entirely.
6236                let size = size.as_deref().unwrap_or_else(|| {
6237                    cluster.managed_size().expect("cluster known to be managed")
6238                });
6239                if scx.catalog.is_cluster_size_cc(size) {
6240                    sql_bail!(
6241                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6242                    );
6243                }
6244
6245                scx.catalog
6246                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6247            }
6248            if !replicas.is_empty() {
6249                options.replicas = AlterOptionParameter::Set(replicas);
6250            }
6251            if let Some(schedule) = schedule {
6252                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6253            }
6254            if let Some(workload_class) = workload_class {
6255                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6256            }
6257        }
6258        AlterClusterAction::ResetOptions(reset_options) => {
6259            use AlterOptionParameter::Reset;
6260            use ClusterOptionName::*;
6261
6262            if !scx.catalog.active_role_id().is_system() {
6263                if reset_options.contains(&WorkloadClass) {
6264                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6265                }
6266            }
6267
6268            for option in reset_options {
6269                match option {
6270                    AvailabilityZones => options.availability_zones = Reset,
6271                    Disk => scx
6272                        .catalog
6273                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6274                    IntrospectionInterval => options.introspection_interval = Reset,
6275                    IntrospectionDebugging => options.introspection_debugging = Reset,
6276                    Managed => options.managed = Reset,
6277                    Replicas => options.replicas = Reset,
6278                    ReplicationFactor => options.replication_factor = Reset,
6279                    Size => options.size = Reset,
6280                    Schedule => options.schedule = Reset,
6281                    WorkloadClass => options.workload_class = Reset,
6282                }
6283            }
6284        }
6285    }
6286    Ok(Plan::AlterCluster(AlterClusterPlan {
6287        id: cluster.id(),
6288        name: cluster.name().to_string(),
6289        options,
6290        strategy: alter_strategy,
6291    }))
6292}
6293
6294pub fn describe_alter_set_cluster(
6295    _: &StatementContext,
6296    _: AlterSetClusterStatement<Aug>,
6297) -> Result<StatementDesc, PlanError> {
6298    Ok(StatementDesc::new(None))
6299}
6300
6301pub fn plan_alter_item_set_cluster(
6302    scx: &StatementContext,
6303    AlterSetClusterStatement {
6304        if_exists,
6305        set_cluster: in_cluster_name,
6306        name,
6307        object_type,
6308    }: AlterSetClusterStatement<Aug>,
6309) -> Result<Plan, PlanError> {
6310    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6311
6312    let object_type = object_type.into();
6313
6314    // Prevent access to `SET CLUSTER` for unsupported objects.
6315    match object_type {
6316        ObjectType::MaterializedView => {}
6317        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6318            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6319        }
6320        _ => {
6321            bail_never_supported!(
6322                format!("ALTER {object_type} SET CLUSTER"),
6323                "sql/alter-set-cluster/",
6324                format!("{object_type} has no associated cluster")
6325            )
6326        }
6327    }
6328
6329    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6330
6331    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6332        Some(entry) => {
6333            let current_cluster = entry.cluster_id();
6334            let Some(current_cluster) = current_cluster else {
6335                sql_bail!("No cluster associated with {name}");
6336            };
6337
6338            if current_cluster == in_cluster.id() {
6339                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6340            } else {
6341                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6342                    id: entry.id(),
6343                    set_cluster: in_cluster.id(),
6344                }))
6345            }
6346        }
6347        None => {
6348            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6349                name: name.to_ast_string_simple(),
6350                object_type,
6351            });
6352
6353            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6354        }
6355    }
6356}
6357
6358pub fn describe_alter_object_rename(
6359    _: &StatementContext,
6360    _: AlterObjectRenameStatement,
6361) -> Result<StatementDesc, PlanError> {
6362    Ok(StatementDesc::new(None))
6363}
6364
6365pub fn plan_alter_object_rename(
6366    scx: &mut StatementContext,
6367    AlterObjectRenameStatement {
6368        name,
6369        object_type,
6370        to_item_name,
6371        if_exists,
6372    }: AlterObjectRenameStatement,
6373) -> Result<Plan, PlanError> {
6374    let object_type = object_type.into();
6375    match (object_type, name) {
6376        (
6377            ObjectType::View
6378            | ObjectType::MaterializedView
6379            | ObjectType::Table
6380            | ObjectType::Source
6381            | ObjectType::Index
6382            | ObjectType::Sink
6383            | ObjectType::Secret
6384            | ObjectType::Connection,
6385            UnresolvedObjectName::Item(name),
6386        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6387        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6388            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6389        }
6390        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6391            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6392        }
6393        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6394            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6395        }
6396        (object_type, name) => {
6397            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6398        }
6399    }
6400}
6401
6402pub fn plan_alter_schema_rename(
6403    scx: &mut StatementContext,
6404    name: UnresolvedSchemaName,
6405    to_schema_name: Ident,
6406    if_exists: bool,
6407) -> Result<Plan, PlanError> {
6408    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6409        let object_type = ObjectType::Schema;
6410        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6411            name: name.to_ast_string_simple(),
6412            object_type,
6413        });
6414        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6415    };
6416
6417    // Make sure the name is unique.
6418    if scx
6419        .resolve_schema_in_database(&db_spec, &to_schema_name)
6420        .is_ok()
6421    {
6422        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6423            to_schema_name.clone().into_string(),
6424        )));
6425    }
6426
6427    // Prevent users from renaming system related schemas.
6428    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6429    if schema.id().is_system() {
6430        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6431    }
6432
6433    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6434        cur_schema_spec: (db_spec, schema_spec),
6435        new_schema_name: to_schema_name.into_string(),
6436    }))
6437}
6438
6439pub fn plan_alter_schema_swap<F>(
6440    scx: &mut StatementContext,
6441    name_a: UnresolvedSchemaName,
6442    name_b: Ident,
6443    gen_temp_suffix: F,
6444) -> Result<Plan, PlanError>
6445where
6446    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6447{
6448    let schema_a = scx.resolve_schema(name_a.clone())?;
6449
6450    let db_spec = schema_a.database().clone();
6451    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6452        sql_bail!("cannot swap schemas that are in the ambient database");
6453    };
6454    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6455
6456    // We cannot swap system schemas.
6457    if schema_a.id().is_system() || schema_b.id().is_system() {
6458        bail_never_supported!("swapping a system schema".to_string())
6459    }
6460
6461    // Generate a temporary name we can swap schema_a to.
6462    //
6463    // 'check' returns if the temp schema name would be valid.
6464    let check = |temp_suffix: &str| {
6465        let mut temp_name = ident!("mz_schema_swap_");
6466        temp_name.append_lossy(temp_suffix);
6467        scx.resolve_schema_in_database(&db_spec, &temp_name)
6468            .is_err()
6469    };
6470    let temp_suffix = gen_temp_suffix(&check)?;
6471    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6472
6473    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6474        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6475        schema_a_name: schema_a.name().schema.to_string(),
6476        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6477        schema_b_name: schema_b.name().schema.to_string(),
6478        name_temp,
6479    }))
6480}
6481
6482pub fn plan_alter_item_rename(
6483    scx: &mut StatementContext,
6484    object_type: ObjectType,
6485    name: UnresolvedItemName,
6486    to_item_name: Ident,
6487    if_exists: bool,
6488) -> Result<Plan, PlanError> {
6489    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6490        Ok(r) => r,
6491        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6492        Err(PlanError::MismatchedObjectType {
6493            name,
6494            is_type: ObjectType::MaterializedView,
6495            expected_type: ObjectType::View,
6496        }) => {
6497            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6498        }
6499        e => e?,
6500    };
6501
6502    match resolved {
6503        Some(entry) => {
6504            let full_name = scx.catalog.resolve_full_name(entry.name());
6505            let item_type = entry.item_type();
6506
6507            let proposed_name = QualifiedItemName {
6508                qualifiers: entry.name().qualifiers.clone(),
6509                item: to_item_name.clone().into_string(),
6510            };
6511
6512            // For PostgreSQL compatibility, items and types cannot have
6513            // overlapping names in a variety of situations. See the comment on
6514            // `CatalogItemType::conflicts_with_type` for details.
6515            let conflicting_type_exists;
6516            let conflicting_item_exists;
6517            if item_type == CatalogItemType::Type {
6518                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6519                conflicting_item_exists = scx
6520                    .catalog
6521                    .get_item_by_name(&proposed_name)
6522                    .map(|item| item.item_type().conflicts_with_type())
6523                    .unwrap_or(false);
6524            } else {
6525                conflicting_type_exists = item_type.conflicts_with_type()
6526                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6527                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6528            };
6529            if conflicting_type_exists || conflicting_item_exists {
6530                sql_bail!("catalog item '{}' already exists", to_item_name);
6531            }
6532
6533            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6534                id: entry.id(),
6535                current_full_name: full_name,
6536                to_name: normalize::ident(to_item_name),
6537                object_type,
6538            }))
6539        }
6540        None => {
6541            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6542                name: name.to_ast_string_simple(),
6543                object_type,
6544            });
6545
6546            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6547        }
6548    }
6549}
6550
6551pub fn plan_alter_cluster_rename(
6552    scx: &mut StatementContext,
6553    object_type: ObjectType,
6554    name: Ident,
6555    to_name: Ident,
6556    if_exists: bool,
6557) -> Result<Plan, PlanError> {
6558    match resolve_cluster(scx, &name, if_exists)? {
6559        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6560            id: entry.id(),
6561            name: entry.name().to_string(),
6562            to_name: ident(to_name),
6563        })),
6564        None => {
6565            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6566                name: name.to_ast_string_simple(),
6567                object_type,
6568            });
6569
6570            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6571        }
6572    }
6573}
6574
6575pub fn plan_alter_cluster_swap<F>(
6576    scx: &mut StatementContext,
6577    name_a: Ident,
6578    name_b: Ident,
6579    gen_temp_suffix: F,
6580) -> Result<Plan, PlanError>
6581where
6582    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6583{
6584    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6585    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6586
6587    let check = |temp_suffix: &str| {
6588        let mut temp_name = ident!("mz_schema_swap_");
6589        temp_name.append_lossy(temp_suffix);
6590        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6591            // Temp name does not exist, so we can use it.
6592            Err(CatalogError::UnknownCluster(_)) => true,
6593            // Temp name already exists!
6594            Ok(_) | Err(_) => false,
6595        }
6596    };
6597    let temp_suffix = gen_temp_suffix(&check)?;
6598    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6599
6600    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6601        id_a: cluster_a.id(),
6602        id_b: cluster_b.id(),
6603        name_a: name_a.into_string(),
6604        name_b: name_b.into_string(),
6605        name_temp,
6606    }))
6607}
6608
6609pub fn plan_alter_cluster_replica_rename(
6610    scx: &mut StatementContext,
6611    object_type: ObjectType,
6612    name: QualifiedReplica,
6613    to_item_name: Ident,
6614    if_exists: bool,
6615) -> Result<Plan, PlanError> {
6616    match resolve_cluster_replica(scx, &name, if_exists)? {
6617        Some((cluster, replica)) => {
6618            ensure_cluster_is_not_managed(scx, cluster.id())?;
6619            Ok(Plan::AlterClusterReplicaRename(
6620                AlterClusterReplicaRenamePlan {
6621                    cluster_id: cluster.id(),
6622                    replica_id: replica,
6623                    name: QualifiedReplica {
6624                        cluster: Ident::new(cluster.name())?,
6625                        replica: name.replica,
6626                    },
6627                    to_name: normalize::ident(to_item_name),
6628                },
6629            ))
6630        }
6631        None => {
6632            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6633                name: name.to_ast_string_simple(),
6634                object_type,
6635            });
6636
6637            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6638        }
6639    }
6640}
6641
6642pub fn describe_alter_object_swap(
6643    _: &StatementContext,
6644    _: AlterObjectSwapStatement,
6645) -> Result<StatementDesc, PlanError> {
6646    Ok(StatementDesc::new(None))
6647}
6648
6649pub fn plan_alter_object_swap(
6650    scx: &mut StatementContext,
6651    stmt: AlterObjectSwapStatement,
6652) -> Result<Plan, PlanError> {
6653    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6654
6655    let AlterObjectSwapStatement {
6656        object_type,
6657        name_a,
6658        name_b,
6659    } = stmt;
6660    let object_type = object_type.into();
6661
6662    // We'll try 10 times to generate a temporary suffix.
6663    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6664        let mut attempts = 0;
6665        let name_temp = loop {
6666            attempts += 1;
6667            if attempts > 10 {
6668                tracing::warn!("Unable to generate temp id for swapping");
6669                sql_bail!("unable to swap!");
6670            }
6671
6672            // Call the provided closure to make sure this name is unique!
6673            let short_id = mz_ore::id_gen::temp_id();
6674            if check_fn(&short_id) {
6675                break short_id;
6676            }
6677        };
6678
6679        Ok(name_temp)
6680    };
6681
6682    match (object_type, name_a, name_b) {
6683        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6684            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6685        }
6686        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6687            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6688        }
6689        (object_type, _, _) => Err(PlanError::Unsupported {
6690            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6691            discussion_no: None,
6692        }),
6693    }
6694}
6695
6696pub fn describe_alter_retain_history(
6697    _: &StatementContext,
6698    _: AlterRetainHistoryStatement<Aug>,
6699) -> Result<StatementDesc, PlanError> {
6700    Ok(StatementDesc::new(None))
6701}
6702
6703pub fn plan_alter_retain_history(
6704    scx: &StatementContext,
6705    AlterRetainHistoryStatement {
6706        object_type,
6707        if_exists,
6708        name,
6709        history,
6710    }: AlterRetainHistoryStatement<Aug>,
6711) -> Result<Plan, PlanError> {
6712    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6713}
6714
6715fn alter_retain_history(
6716    scx: &StatementContext,
6717    object_type: ObjectType,
6718    if_exists: bool,
6719    name: UnresolvedObjectName,
6720    history: Option<WithOptionValue<Aug>>,
6721) -> Result<Plan, PlanError> {
6722    let name = match (object_type, name) {
6723        (
6724            // View gets a special error below.
6725            ObjectType::View
6726            | ObjectType::MaterializedView
6727            | ObjectType::Table
6728            | ObjectType::Source
6729            | ObjectType::Index,
6730            UnresolvedObjectName::Item(name),
6731        ) => name,
6732        (object_type, _) => {
6733            sql_bail!("{object_type} does not support RETAIN HISTORY")
6734        }
6735    };
6736    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6737        Some(entry) => {
6738            let full_name = scx.catalog.resolve_full_name(entry.name());
6739            let item_type = entry.item_type();
6740
6741            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6742            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6743                return Err(PlanError::AlterViewOnMaterializedView(
6744                    full_name.to_string(),
6745                ));
6746            } else if object_type == ObjectType::View {
6747                sql_bail!("{object_type} does not support RETAIN HISTORY")
6748            } else if object_type != item_type {
6749                sql_bail!(
6750                    "\"{}\" is a {} not a {}",
6751                    full_name,
6752                    entry.item_type(),
6753                    format!("{object_type}").to_lowercase()
6754                )
6755            }
6756
6757            // Save the original value so we can write it back down in the create_sql catalog item.
6758            let (value, lcw) = match &history {
6759                Some(WithOptionValue::RetainHistoryFor(value)) => {
6760                    let window = OptionalDuration::try_from_value(value.clone())?;
6761                    (Some(value.clone()), window.0)
6762                }
6763                // None is RESET, so use the default CW.
6764                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6765                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6766            };
6767            let window = plan_retain_history(scx, lcw)?;
6768
6769            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6770                id: entry.id(),
6771                value,
6772                window,
6773                object_type,
6774            }))
6775        }
6776        None => {
6777            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6778                name: name.to_ast_string_simple(),
6779                object_type,
6780            });
6781
6782            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6783        }
6784    }
6785}
6786
6787pub fn describe_alter_secret_options(
6788    _: &StatementContext,
6789    _: AlterSecretStatement<Aug>,
6790) -> Result<StatementDesc, PlanError> {
6791    Ok(StatementDesc::new(None))
6792}
6793
6794pub fn plan_alter_secret(
6795    scx: &mut StatementContext,
6796    stmt: AlterSecretStatement<Aug>,
6797) -> Result<Plan, PlanError> {
6798    let AlterSecretStatement {
6799        name,
6800        if_exists,
6801        value,
6802    } = stmt;
6803    let object_type = ObjectType::Secret;
6804    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6805        Some(entry) => entry.id(),
6806        None => {
6807            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6808                name: name.to_string(),
6809                object_type,
6810            });
6811
6812            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6813        }
6814    };
6815
6816    let secret_as = query::plan_secret_as(scx, value)?;
6817
6818    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6819}
6820
6821pub fn describe_alter_connection(
6822    _: &StatementContext,
6823    _: AlterConnectionStatement<Aug>,
6824) -> Result<StatementDesc, PlanError> {
6825    Ok(StatementDesc::new(None))
6826}
6827
6828generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6829
6830pub fn plan_alter_connection(
6831    scx: &StatementContext,
6832    stmt: AlterConnectionStatement<Aug>,
6833) -> Result<Plan, PlanError> {
6834    let AlterConnectionStatement {
6835        name,
6836        if_exists,
6837        actions,
6838        with_options,
6839    } = stmt;
6840    let conn_name = normalize::unresolved_item_name(name)?;
6841    let entry = match scx.catalog.resolve_item(&conn_name) {
6842        Ok(entry) => entry,
6843        Err(_) if if_exists => {
6844            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6845                name: conn_name.to_string(),
6846                object_type: ObjectType::Sink,
6847            });
6848
6849            return Ok(Plan::AlterNoop(AlterNoopPlan {
6850                object_type: ObjectType::Connection,
6851            }));
6852        }
6853        Err(e) => return Err(e.into()),
6854    };
6855
6856    let connection = entry.connection()?;
6857
6858    if actions
6859        .iter()
6860        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6861    {
6862        if actions.len() > 1 {
6863            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6864        }
6865
6866        if !with_options.is_empty() {
6867            sql_bail!(
6868                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6869                with_options
6870                    .iter()
6871                    .map(|o| o.to_ast_string_simple())
6872                    .join(", ")
6873            );
6874        }
6875
6876        if !matches!(connection, Connection::Ssh(_)) {
6877            sql_bail!(
6878                "{} is not an SSH connection",
6879                scx.catalog.resolve_full_name(entry.name())
6880            )
6881        }
6882
6883        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6884            id: entry.id(),
6885            action: crate::plan::AlterConnectionAction::RotateKeys,
6886        }));
6887    }
6888
6889    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6890    if options.validate.is_some() {
6891        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6892    }
6893
6894    let validate = match options.validate {
6895        Some(val) => val,
6896        None => {
6897            scx.catalog
6898                .system_vars()
6899                .enable_default_connection_validation()
6900                && connection.validate_by_default()
6901        }
6902    };
6903
6904    let connection_type = match connection {
6905        Connection::Aws(_) => CreateConnectionType::Aws,
6906        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6907        Connection::Kafka(_) => CreateConnectionType::Kafka,
6908        Connection::Csr(_) => CreateConnectionType::Csr,
6909        Connection::Postgres(_) => CreateConnectionType::Postgres,
6910        Connection::Ssh(_) => CreateConnectionType::Ssh,
6911        Connection::MySql(_) => CreateConnectionType::MySql,
6912        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6913        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6914    };
6915
6916    // Collect all options irrespective of action taken on them.
6917    let specified_options: BTreeSet<_> = actions
6918        .iter()
6919        .map(|action: &AlterConnectionAction<Aug>| match action {
6920            AlterConnectionAction::SetOption(option) => option.name.clone(),
6921            AlterConnectionAction::DropOption(name) => name.clone(),
6922            AlterConnectionAction::RotateKeys => unreachable!(),
6923        })
6924        .collect();
6925
6926    for invalid in INALTERABLE_OPTIONS {
6927        if specified_options.contains(invalid) {
6928            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6929        }
6930    }
6931
6932    connection::validate_options_per_connection_type(connection_type, specified_options)?;
6933
6934    // Partition operations into set and drop
6935    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6936        actions.into_iter().partition_map(|action| match action {
6937            AlterConnectionAction::SetOption(option) => Either::Left(option),
6938            AlterConnectionAction::DropOption(name) => Either::Right(name),
6939            AlterConnectionAction::RotateKeys => unreachable!(),
6940        });
6941
6942    let set_options: BTreeMap<_, _> = set_options_vec
6943        .clone()
6944        .into_iter()
6945        .map(|option| (option.name, option.value))
6946        .collect();
6947
6948    // Type check values + avoid duplicates; we don't want to e.g. let users
6949    // drop and set the same option in the same statement, so treating drops as
6950    // sets here is fine.
6951    let connection_options_extracted =
6952        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6953
6954    let duplicates: Vec<_> = connection_options_extracted
6955        .seen
6956        .intersection(&drop_options)
6957        .collect();
6958
6959    if !duplicates.is_empty() {
6960        sql_bail!(
6961            "cannot both SET and DROP/RESET options {}",
6962            duplicates
6963                .iter()
6964                .map(|option| option.to_string())
6965                .join(", ")
6966        )
6967    }
6968
6969    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6970        let set_options_count = mutually_exclusive_options
6971            .iter()
6972            .filter(|o| set_options.contains_key(o))
6973            .count();
6974        let drop_options_count = mutually_exclusive_options
6975            .iter()
6976            .filter(|o| drop_options.contains(o))
6977            .count();
6978
6979        // Disallow setting _and_ resetting mutually exclusive options
6980        if set_options_count > 0 && drop_options_count > 0 {
6981            sql_bail!(
6982                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6983                connection_type,
6984                mutually_exclusive_options
6985                    .iter()
6986                    .map(|option| option.to_string())
6987                    .join(", ")
6988            )
6989        }
6990
6991        // If any option is either set or dropped, ensure all mutually exclusive
6992        // options are dropped. We do this "behind the scenes", even though we
6993        // disallow users from performing the same action because this is the
6994        // mechanism by which we overwrite values elsewhere in the code.
6995        if set_options_count > 0 || drop_options_count > 0 {
6996            drop_options.extend(mutually_exclusive_options.iter().cloned());
6997        }
6998
6999        // n.b. if mutually exclusive options are set, those will error when we
7000        // try to replan the connection.
7001    }
7002
7003    Ok(Plan::AlterConnection(AlterConnectionPlan {
7004        id: entry.id(),
7005        action: crate::plan::AlterConnectionAction::AlterOptions {
7006            set_options,
7007            drop_options,
7008            validate,
7009        },
7010    }))
7011}
7012
7013pub fn describe_alter_sink(
7014    _: &StatementContext,
7015    _: AlterSinkStatement<Aug>,
7016) -> Result<StatementDesc, PlanError> {
7017    Ok(StatementDesc::new(None))
7018}
7019
7020pub fn plan_alter_sink(
7021    scx: &mut StatementContext,
7022    stmt: AlterSinkStatement<Aug>,
7023) -> Result<Plan, PlanError> {
7024    let AlterSinkStatement {
7025        sink_name,
7026        if_exists,
7027        action,
7028    } = stmt;
7029
7030    let object_type = ObjectType::Sink;
7031    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7032
7033    let Some(item) = item else {
7034        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7035            name: sink_name.to_string(),
7036            object_type,
7037        });
7038
7039        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7040    };
7041    // Always ALTER objects from their latest version.
7042    let item = item.at_version(RelationVersionSelector::Latest);
7043
7044    match action {
7045        AlterSinkAction::ChangeRelation(new_from) => {
7046            // First we reconstruct the original CREATE SINK statement
7047            let create_sql = item.create_sql();
7048            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7049            let [stmt]: [StatementParseResult; 1] = stmts
7050                .try_into()
7051                .expect("create sql of sink was not exactly one statement");
7052            let Statement::CreateSink(stmt) = stmt.ast else {
7053                unreachable!("invalid create SQL for sink item");
7054            };
7055
7056            // Then resolve and swap the resolved from relation to the new one
7057            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7058            stmt.from = new_from;
7059
7060            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7061            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7062                unreachable!("invalid plan for CREATE SINK statement");
7063            };
7064
7065            plan.sink.version += 1;
7066
7067            Ok(Plan::AlterSink(AlterSinkPlan {
7068                item_id: item.id(),
7069                global_id: item.global_id(),
7070                sink: plan.sink,
7071                with_snapshot: plan.with_snapshot,
7072                in_cluster: plan.in_cluster,
7073            }))
7074        }
7075        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7076        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7077    }
7078}
7079
7080pub fn describe_alter_source(
7081    _: &StatementContext,
7082    _: AlterSourceStatement<Aug>,
7083) -> Result<StatementDesc, PlanError> {
7084    // TODO: put the options here, right?
7085    Ok(StatementDesc::new(None))
7086}
7087
7088generate_extracted_config!(
7089    AlterSourceAddSubsourceOption,
7090    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7091    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7092    (Details, String)
7093);
7094
7095pub fn plan_alter_source(
7096    scx: &mut StatementContext,
7097    stmt: AlterSourceStatement<Aug>,
7098) -> Result<Plan, PlanError> {
7099    let AlterSourceStatement {
7100        source_name,
7101        if_exists,
7102        action,
7103    } = stmt;
7104    let object_type = ObjectType::Source;
7105
7106    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7107        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7108            name: source_name.to_string(),
7109            object_type,
7110        });
7111
7112        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7113    }
7114
7115    match action {
7116        AlterSourceAction::SetOptions(options) => {
7117            let mut options = options.into_iter();
7118            let option = options.next().unwrap();
7119            if option.name == CreateSourceOptionName::RetainHistory {
7120                if options.next().is_some() {
7121                    sql_bail!("RETAIN HISTORY must be only option");
7122                }
7123                return alter_retain_history(
7124                    scx,
7125                    object_type,
7126                    if_exists,
7127                    UnresolvedObjectName::Item(source_name),
7128                    option.value,
7129                );
7130            }
7131            // n.b we use this statement in purification in a way that cannot be
7132            // planned directly.
7133            sql_bail!(
7134                "Cannot modify the {} of a SOURCE.",
7135                option.name.to_ast_string_simple()
7136            );
7137        }
7138        AlterSourceAction::ResetOptions(reset) => {
7139            let mut options = reset.into_iter();
7140            let option = options.next().unwrap();
7141            if option == CreateSourceOptionName::RetainHistory {
7142                if options.next().is_some() {
7143                    sql_bail!("RETAIN HISTORY must be only option");
7144                }
7145                return alter_retain_history(
7146                    scx,
7147                    object_type,
7148                    if_exists,
7149                    UnresolvedObjectName::Item(source_name),
7150                    None,
7151                );
7152            }
7153            sql_bail!(
7154                "Cannot modify the {} of a SOURCE.",
7155                option.to_ast_string_simple()
7156            );
7157        }
7158        AlterSourceAction::DropSubsources { .. } => {
7159            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7160        }
7161        AlterSourceAction::AddSubsources { .. } => {
7162            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7163        }
7164        AlterSourceAction::RefreshReferences => {
7165            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7166        }
7167    };
7168}
7169
7170pub fn describe_alter_system_set(
7171    _: &StatementContext,
7172    _: AlterSystemSetStatement,
7173) -> Result<StatementDesc, PlanError> {
7174    Ok(StatementDesc::new(None))
7175}
7176
7177pub fn plan_alter_system_set(
7178    _: &StatementContext,
7179    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7180) -> Result<Plan, PlanError> {
7181    let name = name.to_string();
7182    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7183        name,
7184        value: scl::plan_set_variable_to(to)?,
7185    }))
7186}
7187
7188pub fn describe_alter_system_reset(
7189    _: &StatementContext,
7190    _: AlterSystemResetStatement,
7191) -> Result<StatementDesc, PlanError> {
7192    Ok(StatementDesc::new(None))
7193}
7194
7195pub fn plan_alter_system_reset(
7196    _: &StatementContext,
7197    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7198) -> Result<Plan, PlanError> {
7199    let name = name.to_string();
7200    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7201}
7202
7203pub fn describe_alter_system_reset_all(
7204    _: &StatementContext,
7205    _: AlterSystemResetAllStatement,
7206) -> Result<StatementDesc, PlanError> {
7207    Ok(StatementDesc::new(None))
7208}
7209
7210pub fn plan_alter_system_reset_all(
7211    _: &StatementContext,
7212    _: AlterSystemResetAllStatement,
7213) -> Result<Plan, PlanError> {
7214    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7215}
7216
7217pub fn describe_alter_role(
7218    _: &StatementContext,
7219    _: AlterRoleStatement<Aug>,
7220) -> Result<StatementDesc, PlanError> {
7221    Ok(StatementDesc::new(None))
7222}
7223
7224pub fn plan_alter_role(
7225    scx: &StatementContext,
7226    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7227) -> Result<Plan, PlanError> {
7228    let option = match option {
7229        AlterRoleOption::Attributes(attrs) => {
7230            let attrs = plan_role_attributes(attrs, scx)?;
7231            PlannedAlterRoleOption::Attributes(attrs)
7232        }
7233        AlterRoleOption::Variable(variable) => {
7234            let var = plan_role_variable(variable)?;
7235            PlannedAlterRoleOption::Variable(var)
7236        }
7237    };
7238
7239    Ok(Plan::AlterRole(AlterRolePlan {
7240        id: name.id,
7241        name: name.name,
7242        option,
7243    }))
7244}
7245
7246pub fn describe_alter_table_add_column(
7247    _: &StatementContext,
7248    _: AlterTableAddColumnStatement<Aug>,
7249) -> Result<StatementDesc, PlanError> {
7250    Ok(StatementDesc::new(None))
7251}
7252
7253pub fn plan_alter_table_add_column(
7254    scx: &StatementContext,
7255    stmt: AlterTableAddColumnStatement<Aug>,
7256) -> Result<Plan, PlanError> {
7257    let AlterTableAddColumnStatement {
7258        if_exists,
7259        name,
7260        if_col_not_exist,
7261        column_name,
7262        data_type,
7263    } = stmt;
7264    let object_type = ObjectType::Table;
7265
7266    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7267
7268    let (relation_id, item_name, desc) =
7269        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7270            Some(item) => {
7271                // Always add columns to the latest version of the item.
7272                let item_name = scx.catalog.resolve_full_name(item.name());
7273                let item = item.at_version(RelationVersionSelector::Latest);
7274                let desc = item.desc(&item_name)?.into_owned();
7275                (item.id(), item_name, desc)
7276            }
7277            None => {
7278                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7279                    name: name.to_ast_string_simple(),
7280                    object_type,
7281                });
7282                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7283            }
7284        };
7285
7286    let column_name = ColumnName::from(column_name.as_str());
7287    if desc.get_by_name(&column_name).is_some() {
7288        if if_col_not_exist {
7289            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7290                column_name: column_name.to_string(),
7291                object_name: item_name.item,
7292            });
7293            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7294        } else {
7295            return Err(PlanError::ColumnAlreadyExists {
7296                column_name,
7297                object_name: item_name.item,
7298            });
7299        }
7300    }
7301
7302    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7303    // TODO(alter_table): Support non-nullable columns with default values.
7304    let column_type = scalar_type.nullable(true);
7305    // "unresolve" our data type so we can later update the persisted create_sql.
7306    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7307
7308    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7309        relation_id,
7310        column_name,
7311        column_type,
7312        raw_sql_type,
7313    }))
7314}
7315
7316pub fn describe_comment(
7317    _: &StatementContext,
7318    _: CommentStatement<Aug>,
7319) -> Result<StatementDesc, PlanError> {
7320    Ok(StatementDesc::new(None))
7321}
7322
7323pub fn plan_comment(
7324    scx: &mut StatementContext,
7325    stmt: CommentStatement<Aug>,
7326) -> Result<Plan, PlanError> {
7327    const MAX_COMMENT_LENGTH: usize = 1024;
7328
7329    let CommentStatement { object, comment } = stmt;
7330
7331    // TODO(parkmycar): Make max comment length configurable.
7332    if let Some(c) = &comment {
7333        if c.len() > 1024 {
7334            return Err(PlanError::CommentTooLong {
7335                length: c.len(),
7336                max_size: MAX_COMMENT_LENGTH,
7337            });
7338        }
7339    }
7340
7341    let (object_id, column_pos) = match &object {
7342        com_ty @ CommentObjectType::Table { name }
7343        | com_ty @ CommentObjectType::View { name }
7344        | com_ty @ CommentObjectType::MaterializedView { name }
7345        | com_ty @ CommentObjectType::Index { name }
7346        | com_ty @ CommentObjectType::Func { name }
7347        | com_ty @ CommentObjectType::Connection { name }
7348        | com_ty @ CommentObjectType::Source { name }
7349        | com_ty @ CommentObjectType::Sink { name }
7350        | com_ty @ CommentObjectType::Secret { name }
7351        | com_ty @ CommentObjectType::ContinualTask { name } => {
7352            let item = scx.get_item_by_resolved_name(name)?;
7353            match (com_ty, item.item_type()) {
7354                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7355                    (CommentObjectId::Table(item.id()), None)
7356                }
7357                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7358                    (CommentObjectId::View(item.id()), None)
7359                }
7360                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7361                    (CommentObjectId::MaterializedView(item.id()), None)
7362                }
7363                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7364                    (CommentObjectId::Index(item.id()), None)
7365                }
7366                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7367                    (CommentObjectId::Func(item.id()), None)
7368                }
7369                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7370                    (CommentObjectId::Connection(item.id()), None)
7371                }
7372                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7373                    (CommentObjectId::Source(item.id()), None)
7374                }
7375                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7376                    (CommentObjectId::Sink(item.id()), None)
7377                }
7378                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7379                    (CommentObjectId::Secret(item.id()), None)
7380                }
7381                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7382                    (CommentObjectId::ContinualTask(item.id()), None)
7383                }
7384                (com_ty, cat_ty) => {
7385                    let expected_type = match com_ty {
7386                        CommentObjectType::Table { .. } => ObjectType::Table,
7387                        CommentObjectType::View { .. } => ObjectType::View,
7388                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7389                        CommentObjectType::Index { .. } => ObjectType::Index,
7390                        CommentObjectType::Func { .. } => ObjectType::Func,
7391                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7392                        CommentObjectType::Source { .. } => ObjectType::Source,
7393                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7394                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7395                        _ => unreachable!("these are the only types we match on"),
7396                    };
7397
7398                    return Err(PlanError::InvalidObjectType {
7399                        expected_type: SystemObjectType::Object(expected_type),
7400                        actual_type: SystemObjectType::Object(cat_ty.into()),
7401                        object_name: item.name().item.clone(),
7402                    });
7403                }
7404            }
7405        }
7406        CommentObjectType::Type { ty } => match ty {
7407            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7408                sql_bail!("cannot comment on anonymous list or map type");
7409            }
7410            ResolvedDataType::Named { id, modifiers, .. } => {
7411                if !modifiers.is_empty() {
7412                    sql_bail!("cannot comment on type with modifiers");
7413                }
7414                (CommentObjectId::Type(*id), None)
7415            }
7416            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7417        },
7418        CommentObjectType::Column { name } => {
7419            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7420            match item.item_type() {
7421                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7422                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7423                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7424                CatalogItemType::MaterializedView => {
7425                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7426                }
7427                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7428                r => {
7429                    return Err(PlanError::Unsupported {
7430                        feature: format!("Specifying comments on a column of {r}"),
7431                        discussion_no: None,
7432                    });
7433                }
7434            }
7435        }
7436        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7437        CommentObjectType::Database { name } => {
7438            (CommentObjectId::Database(*name.database_id()), None)
7439        }
7440        CommentObjectType::Schema { name } => (
7441            CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7442            None,
7443        ),
7444        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7445        CommentObjectType::ClusterReplica { name } => {
7446            let replica = scx.catalog.resolve_cluster_replica(name)?;
7447            (
7448                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7449                None,
7450            )
7451        }
7452        CommentObjectType::NetworkPolicy { name } => {
7453            (CommentObjectId::NetworkPolicy(name.id), None)
7454        }
7455    };
7456
7457    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7458    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7459    // it's the easiest place to raise an error.
7460    //
7461    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7462    if let Some(p) = column_pos {
7463        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7464            max_num_columns: MAX_NUM_COLUMNS,
7465            req_num_columns: p,
7466        })?;
7467    }
7468
7469    Ok(Plan::Comment(CommentPlan {
7470        object_id,
7471        sub_component: column_pos,
7472        comment,
7473    }))
7474}
7475
7476pub(crate) fn resolve_cluster<'a>(
7477    scx: &'a StatementContext,
7478    name: &'a Ident,
7479    if_exists: bool,
7480) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7481    match scx.resolve_cluster(Some(name)) {
7482        Ok(cluster) => Ok(Some(cluster)),
7483        Err(_) if if_exists => Ok(None),
7484        Err(e) => Err(e),
7485    }
7486}
7487
7488pub(crate) fn resolve_cluster_replica<'a>(
7489    scx: &'a StatementContext,
7490    name: &QualifiedReplica,
7491    if_exists: bool,
7492) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7493    match scx.resolve_cluster(Some(&name.cluster)) {
7494        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7495            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7496            None if if_exists => Ok(None),
7497            None => Err(sql_err!(
7498                "CLUSTER {} has no CLUSTER REPLICA named {}",
7499                cluster.name(),
7500                name.replica.as_str().quoted(),
7501            )),
7502        },
7503        Err(_) if if_exists => Ok(None),
7504        Err(e) => Err(e),
7505    }
7506}
7507
7508pub(crate) fn resolve_database<'a>(
7509    scx: &'a StatementContext,
7510    name: &'a UnresolvedDatabaseName,
7511    if_exists: bool,
7512) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7513    match scx.resolve_database(name) {
7514        Ok(database) => Ok(Some(database)),
7515        Err(_) if if_exists => Ok(None),
7516        Err(e) => Err(e),
7517    }
7518}
7519
7520pub(crate) fn resolve_schema<'a>(
7521    scx: &'a StatementContext,
7522    name: UnresolvedSchemaName,
7523    if_exists: bool,
7524) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7525    match scx.resolve_schema(name) {
7526        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7527        Err(_) if if_exists => Ok(None),
7528        Err(e) => Err(e),
7529    }
7530}
7531
7532pub(crate) fn resolve_network_policy<'a>(
7533    scx: &'a StatementContext,
7534    name: Ident,
7535    if_exists: bool,
7536) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7537    match scx.catalog.resolve_network_policy(&name.to_string()) {
7538        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7539            id: policy.id(),
7540            name: policy.name().to_string(),
7541        })),
7542        Err(_) if if_exists => Ok(None),
7543        Err(e) => Err(e.into()),
7544    }
7545}
7546
7547pub(crate) fn resolve_item_or_type<'a>(
7548    scx: &'a StatementContext,
7549    object_type: ObjectType,
7550    name: UnresolvedItemName,
7551    if_exists: bool,
7552) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7553    let name = normalize::unresolved_item_name(name)?;
7554    let catalog_item = match object_type {
7555        ObjectType::Type => scx.catalog.resolve_type(&name),
7556        _ => scx.catalog.resolve_item(&name),
7557    };
7558
7559    match catalog_item {
7560        Ok(item) => {
7561            let is_type = ObjectType::from(item.item_type());
7562            if object_type == is_type {
7563                Ok(Some(item))
7564            } else {
7565                Err(PlanError::MismatchedObjectType {
7566                    name: scx.catalog.minimal_qualification(item.name()),
7567                    is_type,
7568                    expected_type: object_type,
7569                })
7570            }
7571        }
7572        Err(_) if if_exists => Ok(None),
7573        Err(e) => Err(e.into()),
7574    }
7575}
7576
7577/// Returns an error if the given cluster is a managed cluster
7578fn ensure_cluster_is_not_managed(
7579    scx: &StatementContext,
7580    cluster_id: ClusterId,
7581) -> Result<(), PlanError> {
7582    let cluster = scx.catalog.get_cluster(cluster_id);
7583    if cluster.is_managed() {
7584        Err(PlanError::ManagedCluster {
7585            cluster_name: cluster.name().to_string(),
7586        })
7587    } else {
7588        Ok(())
7589    }
7590}