mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_proto::RustType;
33use mz_repr::adt::interval::Interval;
34use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
35use mz_repr::network_policy_id::NetworkPolicyId;
36use mz_repr::optimize::OptimizerFeatureOverrides;
37use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
38use mz_repr::role_id::RoleId;
39use mz_repr::{
40    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
41    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
42    preserves_order, strconv,
43};
44use mz_sql_parser::ast::{
45    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
46    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
47    AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
48    AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
49    AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
50    AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
51    AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
52    AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
53    ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
54    ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
55    ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
56    ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
57    ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
58    CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
59    CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
60    CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
61    CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
62    CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
63    CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
64    CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
65    CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
66    CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
67    CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
68    CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
69    CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
70    DropOwnedStatement, Expr, Format, FormatSpecifier, IcebergSinkConfigOption, Ident,
71    IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint,
72    LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption,
73    MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, NetworkPolicyOption,
74    NetworkPolicyOptionName, NetworkPolicyRuleDefinition, NetworkPolicyRuleOption,
75    NetworkPolicyRuleOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema,
76    QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue,
77    ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar,
78    SourceErrorPolicy, SourceIncludeMetadata, SqlServerConfigOption, SqlServerConfigOptionName,
79    Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption,
80    TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName,
81    UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
82    WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
90    SinkEnvelope, StorageSinkConnection,
91};
92use mz_storage_types::sources::encoding::{
93    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94    SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110    PostgresSourceConnection, PostgresSourcePublicationDetails,
111    ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
120    SqlServerSourceExtras, Timeline,
121};
122use prost::Message;
123
124use crate::ast::display::AstDisplay;
125use crate::catalog::{
126    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
127    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
128};
129use crate::iceberg::IcebergSinkConfigOptionExtracted;
130use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
131use crate::names::{
132    Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
133    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
134    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
135};
136use crate::normalize::{self, ident};
137use crate::plan::error::PlanError;
138use crate::plan::query::{
139    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
140    scalar_type_from_sql,
141};
142use crate::plan::scope::Scope;
143use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
144use crate::plan::statement::{StatementContext, StatementDesc, scl};
145use crate::plan::typeconv::CastContext;
146use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
147use crate::plan::{
148    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
149    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
150    AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
151    AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
152    AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
153    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
154    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
155    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
156    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
157    CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
158    CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
159    CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
160    MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
161    PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
162    Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
163    WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
164    transform_ast,
165};
166use crate::session::vars::{
167    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
168    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
169};
170use crate::{names, parse};
171
172mod connection;
173
174// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
175//
176// The real max is probably higher than this, but it's easier to relax a constraint than make it
177// more strict.
178const MAX_NUM_COLUMNS: usize = 256;
179
180static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
181    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
182
183/// Given a relation desc and a column list, checks that:
184/// - the column list is a prefix of the desc;
185/// - all the listed columns are types that have meaningful Persist-level ordering.
186fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
187    if partition_by.len() > desc.len() {
188        tracing::error!(
189            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
190            desc.len(),
191            partition_by.len()
192        );
193        partition_by.truncate(desc.len());
194    }
195
196    let desc_prefix = desc.iter().take(partition_by.len());
197    for (idx, ((desc_name, desc_type), partition_name)) in
198        desc_prefix.zip_eq(partition_by).enumerate()
199    {
200        let partition_name = normalize::column_name(partition_name);
201        if *desc_name != partition_name {
202            sql_bail!(
203                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
204            );
205        }
206        if !preserves_order(&desc_type.scalar_type) {
207            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
208        }
209    }
210    Ok(())
211}
212
213pub fn describe_create_database(
214    _: &StatementContext,
215    _: CreateDatabaseStatement,
216) -> Result<StatementDesc, PlanError> {
217    Ok(StatementDesc::new(None))
218}
219
220pub fn plan_create_database(
221    _: &StatementContext,
222    CreateDatabaseStatement {
223        name,
224        if_not_exists,
225    }: CreateDatabaseStatement,
226) -> Result<Plan, PlanError> {
227    Ok(Plan::CreateDatabase(CreateDatabasePlan {
228        name: normalize::ident(name.0),
229        if_not_exists,
230    }))
231}
232
233pub fn describe_create_schema(
234    _: &StatementContext,
235    _: CreateSchemaStatement,
236) -> Result<StatementDesc, PlanError> {
237    Ok(StatementDesc::new(None))
238}
239
240pub fn plan_create_schema(
241    scx: &StatementContext,
242    CreateSchemaStatement {
243        mut name,
244        if_not_exists,
245    }: CreateSchemaStatement,
246) -> Result<Plan, PlanError> {
247    if name.0.len() > 2 {
248        sql_bail!("schema name {} has more than two components", name);
249    }
250    let schema_name = normalize::ident(
251        name.0
252            .pop()
253            .expect("names always have at least one component"),
254    );
255    let database_spec = match name.0.pop() {
256        None => match scx.catalog.active_database() {
257            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
258            None => sql_bail!("no database specified and no active database"),
259        },
260        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
261            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
262            Err(_) => sql_bail!("invalid database {}", n.as_str()),
263        },
264    };
265    Ok(Plan::CreateSchema(CreateSchemaPlan {
266        database_spec,
267        schema_name,
268        if_not_exists,
269    }))
270}
271
272pub fn describe_create_table(
273    _: &StatementContext,
274    _: CreateTableStatement<Aug>,
275) -> Result<StatementDesc, PlanError> {
276    Ok(StatementDesc::new(None))
277}
278
279pub fn plan_create_table(
280    scx: &StatementContext,
281    stmt: CreateTableStatement<Aug>,
282) -> Result<Plan, PlanError> {
283    let CreateTableStatement {
284        name,
285        columns,
286        constraints,
287        if_not_exists,
288        temporary,
289        with_options,
290    } = &stmt;
291
292    let names: Vec<_> = columns
293        .iter()
294        .filter(|c| {
295            // This set of `names` is used to create the initial RelationDesc.
296            // Columns that have been added at later versions of the table will
297            // get added further below.
298            let is_versioned = c
299                .options
300                .iter()
301                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
302            !is_versioned
303        })
304        .map(|c| normalize::column_name(c.name.clone()))
305        .collect();
306
307    if let Some(dup) = names.iter().duplicates().next() {
308        sql_bail!("column {} specified more than once", dup.quoted());
309    }
310
311    // Build initial relation type that handles declared data types
312    // and NOT NULL constraints.
313    let mut column_types = Vec::with_capacity(columns.len());
314    let mut defaults = Vec::with_capacity(columns.len());
315    let mut changes = BTreeMap::new();
316    let mut keys = Vec::new();
317
318    for (i, c) in columns.into_iter().enumerate() {
319        let aug_data_type = &c.data_type;
320        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
321        let mut nullable = true;
322        let mut default = Expr::null();
323        let mut versioned = false;
324        for option in &c.options {
325            match &option.option {
326                ColumnOption::NotNull => nullable = false,
327                ColumnOption::Default(expr) => {
328                    // Ensure expression can be planned and yields the correct
329                    // type.
330                    let mut expr = expr.clone();
331                    transform_ast::transform(scx, &mut expr)?;
332                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
333                    default = expr.clone();
334                }
335                ColumnOption::Unique { is_primary } => {
336                    keys.push(vec![i]);
337                    if *is_primary {
338                        nullable = false;
339                    }
340                }
341                ColumnOption::Versioned { action, version } => {
342                    let version = RelationVersion::from(*version);
343                    versioned = true;
344
345                    let name = normalize::column_name(c.name.clone());
346                    let typ = ty.clone().nullable(nullable);
347
348                    changes.insert(version, (action.clone(), name, typ));
349                }
350                other => {
351                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
352                }
353            }
354        }
355        // TODO(alter_table): This assumes all versioned columns are at the
356        // end. This will no longer be true when we support dropping columns.
357        if !versioned {
358            column_types.push(ty.nullable(nullable));
359        }
360        defaults.push(default);
361    }
362
363    let mut seen_primary = false;
364    'c: for constraint in constraints {
365        match constraint {
366            TableConstraint::Unique {
367                name: _,
368                columns,
369                is_primary,
370                nulls_not_distinct,
371            } => {
372                if seen_primary && *is_primary {
373                    sql_bail!(
374                        "multiple primary keys for table {} are not allowed",
375                        name.to_ast_string_stable()
376                    );
377                }
378                seen_primary = *is_primary || seen_primary;
379
380                let mut key = vec![];
381                for column in columns {
382                    let column = normalize::column_name(column.clone());
383                    match names.iter().position(|name| *name == column) {
384                        None => sql_bail!("unknown column in constraint: {}", column),
385                        Some(i) => {
386                            let nullable = &mut column_types[i].nullable;
387                            if *is_primary {
388                                if *nulls_not_distinct {
389                                    sql_bail!(
390                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
391                                    );
392                                }
393
394                                *nullable = false;
395                            } else if !(*nulls_not_distinct || !*nullable) {
396                                // Non-primary key unique constraints are only keys if all of their
397                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
398                                break 'c;
399                            }
400
401                            key.push(i);
402                        }
403                    }
404                }
405
406                if *is_primary {
407                    keys.insert(0, key);
408                } else {
409                    keys.push(key);
410                }
411            }
412            TableConstraint::ForeignKey { .. } => {
413                // Foreign key constraints are not presently enforced. We allow
414                // them with feature flags for sqllogictest's sake.
415                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
416            }
417            TableConstraint::Check { .. } => {
418                // Check constraints are not presently enforced. We allow them
419                // with feature flags for sqllogictest's sake.
420                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
421            }
422        }
423    }
424
425    if !keys.is_empty() {
426        // Unique constraints are not presently enforced. We allow them with feature flags for
427        // sqllogictest's sake.
428        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
429    }
430
431    let typ = SqlRelationType::new(column_types).with_keys(keys);
432
433    let temporary = *temporary;
434    let name = if temporary {
435        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
436    } else {
437        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
438    };
439
440    // Check for an object in the catalog with this same name
441    let full_name = scx.catalog.resolve_full_name(&name);
442    let partial_name = PartialItemName::from(full_name.clone());
443    // For PostgreSQL compatibility, we need to prevent creating tables when
444    // there is an existing object *or* type of the same name.
445    if let (false, Ok(item)) = (
446        if_not_exists,
447        scx.catalog.resolve_item_or_type(&partial_name),
448    ) {
449        return Err(PlanError::ItemAlreadyExists {
450            name: full_name.to_string(),
451            item_type: item.item_type(),
452        });
453    }
454
455    let desc = RelationDesc::new(typ, names);
456    let mut desc = VersionedRelationDesc::new(desc);
457    for (version, (_action, name, typ)) in changes.into_iter() {
458        let new_version = desc.add_column(name, typ);
459        if version != new_version {
460            return Err(PlanError::InvalidTable {
461                name: full_name.item,
462            });
463        }
464    }
465
466    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
467
468    // Table options should only consider the original columns, since those
469    // were the only ones in scope when the table was created.
470    //
471    // TODO(alter_table): Will need to reconsider this when we support ALTERing
472    // the PARTITION BY columns.
473    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
474    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
475
476    let compaction_window = options.iter().find_map(|o| {
477        #[allow(irrefutable_let_patterns)]
478        if let crate::plan::TableOption::RetainHistory(lcw) = o {
479            Some(lcw.clone())
480        } else {
481            None
482        }
483    });
484
485    let table = Table {
486        create_sql,
487        desc,
488        temporary,
489        compaction_window,
490        data_source: TableDataSource::TableWrites { defaults },
491    };
492    Ok(Plan::CreateTable(CreateTablePlan {
493        name,
494        table,
495        if_not_exists: *if_not_exists,
496    }))
497}
498
499pub fn describe_create_table_from_source(
500    _: &StatementContext,
501    _: CreateTableFromSourceStatement<Aug>,
502) -> Result<StatementDesc, PlanError> {
503    Ok(StatementDesc::new(None))
504}
505
506pub fn describe_create_webhook_source(
507    _: &StatementContext,
508    _: CreateWebhookSourceStatement<Aug>,
509) -> Result<StatementDesc, PlanError> {
510    Ok(StatementDesc::new(None))
511}
512
513pub fn describe_create_source(
514    _: &StatementContext,
515    _: CreateSourceStatement<Aug>,
516) -> Result<StatementDesc, PlanError> {
517    Ok(StatementDesc::new(None))
518}
519
520pub fn describe_create_subsource(
521    _: &StatementContext,
522    _: CreateSubsourceStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524    Ok(StatementDesc::new(None))
525}
526
527generate_extracted_config!(
528    CreateSourceOption,
529    (TimestampInterval, Duration),
530    (RetainHistory, OptionalDuration)
531);
532
533generate_extracted_config!(
534    PgConfigOption,
535    (Details, String),
536    (Publication, String),
537    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
538    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
539);
540
541generate_extracted_config!(
542    MySqlConfigOption,
543    (Details, String),
544    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
545    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
546);
547
548generate_extracted_config!(
549    SqlServerConfigOption,
550    (Details, String),
551    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
552    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
553);
554
555pub fn plan_create_webhook_source(
556    scx: &StatementContext,
557    mut stmt: CreateWebhookSourceStatement<Aug>,
558) -> Result<Plan, PlanError> {
559    if stmt.is_table {
560        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
561    }
562
563    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
564    // we plan to normalize when we canonicalize the create statement.
565    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
566    let enable_multi_replica_sources =
567        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
568    if !enable_multi_replica_sources {
569        if in_cluster.replica_ids().len() > 1 {
570            sql_bail!("cannot create webhook source in cluster with more than one replica")
571        }
572    }
573    let create_sql =
574        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
575
576    let CreateWebhookSourceStatement {
577        name,
578        if_not_exists,
579        body_format,
580        include_headers,
581        validate_using,
582        is_table,
583        // We resolved `in_cluster` above, so we want to ignore it here.
584        in_cluster: _,
585    } = stmt;
586
587    let validate_using = validate_using
588        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
589        .transpose()?;
590    if let Some(WebhookValidation { expression, .. }) = &validate_using {
591        // If the validation expression doesn't reference any part of the request, then we should
592        // return an error because it's almost definitely wrong.
593        if !expression.contains_column() {
594            return Err(PlanError::WebhookValidationDoesNotUseColumns);
595        }
596        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
597        // allow calls to `now()` because some webhook providers recommend rejecting requests that
598        // are older than a certain threshold.
599        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
600            return Err(PlanError::WebhookValidationNonDeterministic);
601        }
602    }
603
604    let body_format = match body_format {
605        Format::Bytes => WebhookBodyFormat::Bytes,
606        Format::Json { array } => WebhookBodyFormat::Json { array },
607        Format::Text => WebhookBodyFormat::Text,
608        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
609        ty => {
610            return Err(PlanError::Unsupported {
611                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
612                discussion_no: None,
613            });
614        }
615    };
616
617    let mut column_ty = vec![
618        // Always include the body of the request as the first column.
619        SqlColumnType {
620            scalar_type: SqlScalarType::from(body_format),
621            nullable: false,
622        },
623    ];
624    let mut column_names = vec!["body".to_string()];
625
626    let mut headers = WebhookHeaders::default();
627
628    // Include a `headers` column, possibly filtered.
629    if let Some(filters) = include_headers.column {
630        column_ty.push(SqlColumnType {
631            scalar_type: SqlScalarType::Map {
632                value_type: Box::new(SqlScalarType::String),
633                custom_id: None,
634            },
635            nullable: false,
636        });
637        column_names.push("headers".to_string());
638
639        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
640            filters.into_iter().partition_map(|filter| {
641                if filter.block {
642                    itertools::Either::Right(filter.header_name)
643                } else {
644                    itertools::Either::Left(filter.header_name)
645                }
646            });
647        headers.header_column = Some(WebhookHeaderFilters { allow, block });
648    }
649
650    // Map headers to specific columns.
651    for header in include_headers.mappings {
652        let scalar_type = header
653            .use_bytes
654            .then_some(SqlScalarType::Bytes)
655            .unwrap_or(SqlScalarType::String);
656        column_ty.push(SqlColumnType {
657            scalar_type,
658            nullable: true,
659        });
660        column_names.push(header.column_name.into_string());
661
662        let column_idx = column_ty.len() - 1;
663        // Double check we're consistent with column names.
664        assert_eq!(
665            column_idx,
666            column_names.len() - 1,
667            "header column names and types don't match"
668        );
669        headers
670            .mapped_headers
671            .insert(column_idx, (header.header_name, header.use_bytes));
672    }
673
674    // Validate our columns.
675    let mut unique_check = HashSet::with_capacity(column_names.len());
676    for name in &column_names {
677        if !unique_check.insert(name) {
678            return Err(PlanError::AmbiguousColumn(name.clone().into()));
679        }
680    }
681    if column_names.len() > MAX_NUM_COLUMNS {
682        return Err(PlanError::TooManyColumns {
683            max_num_columns: MAX_NUM_COLUMNS,
684            req_num_columns: column_names.len(),
685        });
686    }
687
688    let typ = SqlRelationType::new(column_ty);
689    let desc = RelationDesc::new(typ, column_names);
690
691    // Check for an object in the catalog with this same name
692    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
693    let full_name = scx.catalog.resolve_full_name(&name);
694    let partial_name = PartialItemName::from(full_name.clone());
695    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
696        return Err(PlanError::ItemAlreadyExists {
697            name: full_name.to_string(),
698            item_type: item.item_type(),
699        });
700    }
701
702    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
703    // such, we always use a default of EpochMilliseconds.
704    let timeline = Timeline::EpochMilliseconds;
705
706    let plan = if is_table {
707        let data_source = DataSourceDesc::Webhook {
708            validate_using,
709            body_format,
710            headers,
711            cluster_id: Some(in_cluster.id()),
712        };
713        let data_source = TableDataSource::DataSource {
714            desc: data_source,
715            timeline,
716        };
717        Plan::CreateTable(CreateTablePlan {
718            name,
719            if_not_exists,
720            table: Table {
721                create_sql,
722                desc: VersionedRelationDesc::new(desc),
723                temporary: false,
724                compaction_window: None,
725                data_source,
726            },
727        })
728    } else {
729        let data_source = DataSourceDesc::Webhook {
730            validate_using,
731            body_format,
732            headers,
733            // Important: The cluster is set at the `Source` level.
734            cluster_id: None,
735        };
736        Plan::CreateSource(CreateSourcePlan {
737            name,
738            source: Source {
739                create_sql,
740                data_source,
741                desc,
742                compaction_window: None,
743            },
744            if_not_exists,
745            timeline,
746            in_cluster: Some(in_cluster.id()),
747        })
748    };
749
750    Ok(plan)
751}
752
753pub fn plan_create_source(
754    scx: &StatementContext,
755    mut stmt: CreateSourceStatement<Aug>,
756) -> Result<Plan, PlanError> {
757    let CreateSourceStatement {
758        name,
759        in_cluster: _,
760        col_names,
761        connection: source_connection,
762        envelope,
763        if_not_exists,
764        format,
765        key_constraint,
766        include_metadata,
767        with_options,
768        external_references: referenced_subsources,
769        progress_subsource,
770    } = &stmt;
771
772    mz_ore::soft_assert_or_log!(
773        referenced_subsources.is_none(),
774        "referenced subsources must be cleared in purification"
775    );
776
777    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
778        && scx.catalog.system_vars().force_source_table_syntax();
779
780    // If the new source table syntax is forced all the options related to the primary
781    // source output should be un-set.
782    if force_source_table_syntax {
783        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
784            Err(PlanError::UseTablesForSources(
785                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
786            ))?;
787        }
788    }
789
790    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
791
792    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
793        && include_metadata
794            .iter()
795            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
796    {
797        // TODO(guswynn): should this be `bail_unsupported!`?
798        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
799    }
800    if !matches!(
801        source_connection,
802        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
803    ) && !include_metadata.is_empty()
804    {
805        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
806    }
807
808    if !include_metadata.is_empty()
809        && !matches!(
810            envelope,
811            ast::SourceEnvelope::Upsert { .. }
812                | ast::SourceEnvelope::None
813                | ast::SourceEnvelope::Debezium
814        )
815    {
816        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
817    }
818
819    let external_connection =
820        plan_generic_source_connection(scx, source_connection, include_metadata)?;
821
822    let CreateSourceOptionExtracted {
823        timestamp_interval,
824        retain_history,
825        seen: _,
826    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
827
828    let metadata_columns_desc = match external_connection {
829        GenericSourceConnection::Kafka(KafkaSourceConnection {
830            ref metadata_columns,
831            ..
832        }) => kafka_metadata_columns_desc(metadata_columns),
833        _ => vec![],
834    };
835
836    // Generate the relation description for the primary export of the source.
837    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
838        scx,
839        &envelope,
840        format,
841        Some(external_connection.default_key_desc()),
842        external_connection.default_value_desc(),
843        include_metadata,
844        metadata_columns_desc,
845        &external_connection,
846    )?;
847    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
848
849    let names: Vec<_> = desc.iter_names().cloned().collect();
850    if let Some(dup) = names.iter().duplicates().next() {
851        sql_bail!("column {} specified more than once", dup.quoted());
852    }
853
854    // Apply user-specified key constraint
855    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
856        // Don't remove this without addressing
857        // https://github.com/MaterializeInc/database-issues/issues/4371.
858        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
859
860        let key_columns = columns
861            .into_iter()
862            .map(normalize::column_name)
863            .collect::<Vec<_>>();
864
865        let mut uniq = BTreeSet::new();
866        for col in key_columns.iter() {
867            if !uniq.insert(col) {
868                sql_bail!("Repeated column name in source key constraint: {}", col);
869            }
870        }
871
872        let key_indices = key_columns
873            .iter()
874            .map(|col| {
875                let name_idx = desc
876                    .get_by_name(col)
877                    .map(|(idx, _type)| idx)
878                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
879                if desc.get_unambiguous_name(name_idx).is_none() {
880                    sql_bail!("Ambiguous column in source key constraint: {}", col);
881                }
882                Ok(name_idx)
883            })
884            .collect::<Result<Vec<_>, _>>()?;
885
886        if !desc.typ().keys.is_empty() {
887            return Err(key_constraint_err(&desc, &key_columns));
888        } else {
889            desc = desc.with_key(key_indices);
890        }
891    }
892
893    let timestamp_interval = match timestamp_interval {
894        Some(duration) => {
895            let min = scx.catalog.system_vars().min_timestamp_interval();
896            let max = scx.catalog.system_vars().max_timestamp_interval();
897            if duration < min || duration > max {
898                return Err(PlanError::InvalidTimestampInterval {
899                    min,
900                    max,
901                    requested: duration,
902                });
903            }
904            duration
905        }
906        None => scx.catalog.config().timestamp_interval,
907    };
908
909    let (desc, data_source) = match progress_subsource {
910        Some(name) => {
911            let DeferredItemName::Named(name) = name else {
912                sql_bail!("[internal error] progress subsource must be named during purification");
913            };
914            let ResolvedItemName::Item { id, .. } = name else {
915                sql_bail!("[internal error] invalid target id");
916            };
917
918            let details = match external_connection {
919                GenericSourceConnection::Kafka(ref c) => {
920                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
921                        metadata_columns: c.metadata_columns.clone(),
922                    })
923                }
924                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
925                    LoadGenerator::Auction
926                    | LoadGenerator::Marketing
927                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
928                    LoadGenerator::Counter { .. }
929                    | LoadGenerator::Clock
930                    | LoadGenerator::Datums
931                    | LoadGenerator::KeyValue(_) => {
932                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
933                            output: LoadGeneratorOutput::Default,
934                        })
935                    }
936                },
937                GenericSourceConnection::Postgres(_)
938                | GenericSourceConnection::MySql(_)
939                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
940            };
941
942            let data_source = DataSourceDesc::OldSyntaxIngestion {
943                desc: SourceDesc {
944                    connection: external_connection,
945                    timestamp_interval,
946                },
947                progress_subsource: *id,
948                data_config: SourceExportDataConfig {
949                    encoding,
950                    envelope: envelope.clone(),
951                },
952                details,
953            };
954            (desc, data_source)
955        }
956        None => {
957            let desc = external_connection.timestamp_desc();
958            let data_source = DataSourceDesc::Ingestion(SourceDesc {
959                connection: external_connection,
960                timestamp_interval,
961            });
962            (desc, data_source)
963        }
964    };
965
966    let if_not_exists = *if_not_exists;
967    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
968
969    // Check for an object in the catalog with this same name
970    let full_name = scx.catalog.resolve_full_name(&name);
971    let partial_name = PartialItemName::from(full_name.clone());
972    // For PostgreSQL compatibility, we need to prevent creating sources when
973    // there is an existing object *or* type of the same name.
974    if let (false, Ok(item)) = (
975        if_not_exists,
976        scx.catalog.resolve_item_or_type(&partial_name),
977    ) {
978        return Err(PlanError::ItemAlreadyExists {
979            name: full_name.to_string(),
980            item_type: item.item_type(),
981        });
982    }
983
984    // We will rewrite the cluster if one is not provided, so we must use the
985    // `in_cluster` value we plan to normalize when we canonicalize the create
986    // statement.
987    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
988
989    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
990
991    // Determine a default timeline for the source.
992    let timeline = match envelope {
993        SourceEnvelope::CdcV2 => {
994            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
995        }
996        _ => Timeline::EpochMilliseconds,
997    };
998
999    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1000
1001    let source = Source {
1002        create_sql,
1003        data_source,
1004        desc,
1005        compaction_window,
1006    };
1007
1008    Ok(Plan::CreateSource(CreateSourcePlan {
1009        name,
1010        source,
1011        if_not_exists,
1012        timeline,
1013        in_cluster: Some(in_cluster.id()),
1014    }))
1015}
1016
1017pub fn plan_generic_source_connection(
1018    scx: &StatementContext<'_>,
1019    source_connection: &CreateSourceConnection<Aug>,
1020    include_metadata: &Vec<SourceIncludeMetadata>,
1021) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1022    Ok(match source_connection {
1023        CreateSourceConnection::Kafka {
1024            connection,
1025            options,
1026        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1027            scx,
1028            connection,
1029            options,
1030            include_metadata,
1031        )?),
1032        CreateSourceConnection::Postgres {
1033            connection,
1034            options,
1035        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1036            scx, connection, options,
1037        )?),
1038        CreateSourceConnection::SqlServer {
1039            connection,
1040            options,
1041        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1042            scx, connection, options,
1043        )?),
1044        CreateSourceConnection::MySql {
1045            connection,
1046            options,
1047        } => {
1048            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1049        }
1050        CreateSourceConnection::LoadGenerator { generator, options } => {
1051            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1052                scx,
1053                generator,
1054                options,
1055                include_metadata,
1056            )?)
1057        }
1058    })
1059}
1060
1061fn plan_load_generator_source_connection(
1062    scx: &StatementContext<'_>,
1063    generator: &ast::LoadGenerator,
1064    options: &Vec<LoadGeneratorOption<Aug>>,
1065    include_metadata: &Vec<SourceIncludeMetadata>,
1066) -> Result<LoadGeneratorSourceConnection, PlanError> {
1067    let load_generator =
1068        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1069    let LoadGeneratorOptionExtracted {
1070        tick_interval,
1071        as_of,
1072        up_to,
1073        ..
1074    } = options.clone().try_into()?;
1075    let tick_micros = match tick_interval {
1076        Some(interval) => Some(interval.as_micros().try_into()?),
1077        None => None,
1078    };
1079    if up_to < as_of {
1080        sql_bail!("UP TO cannot be less than AS OF");
1081    }
1082    Ok(LoadGeneratorSourceConnection {
1083        load_generator,
1084        tick_micros,
1085        as_of,
1086        up_to,
1087    })
1088}
1089
1090fn plan_mysql_source_connection(
1091    scx: &StatementContext<'_>,
1092    connection: &ResolvedItemName,
1093    options: &Vec<MySqlConfigOption<Aug>>,
1094) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1095    let connection_item = scx.get_item_by_resolved_name(connection)?;
1096    match connection_item.connection()? {
1097        Connection::MySql(connection) => connection,
1098        _ => sql_bail!(
1099            "{} is not a MySQL connection",
1100            scx.catalog.resolve_full_name(connection_item.name())
1101        ),
1102    };
1103    let MySqlConfigOptionExtracted {
1104        details,
1105        // text/exclude columns are already part of the source-exports and are only included
1106        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1107        // be removed once we drop support for implicitly created subsources.
1108        text_columns: _,
1109        exclude_columns: _,
1110        seen: _,
1111    } = options.clone().try_into()?;
1112    let details = details
1113        .as_ref()
1114        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1115    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1116    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1117    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1118    Ok(MySqlSourceConnection {
1119        connection: connection_item.id(),
1120        connection_id: connection_item.id(),
1121        details,
1122    })
1123}
1124
1125fn plan_sqlserver_source_connection(
1126    scx: &StatementContext<'_>,
1127    connection: &ResolvedItemName,
1128    options: &Vec<SqlServerConfigOption<Aug>>,
1129) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1130    let connection_item = scx.get_item_by_resolved_name(connection)?;
1131    match connection_item.connection()? {
1132        Connection::SqlServer(connection) => connection,
1133        _ => sql_bail!(
1134            "{} is not a SQL Server connection",
1135            scx.catalog.resolve_full_name(connection_item.name())
1136        ),
1137    };
1138    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1139    let details = details
1140        .as_ref()
1141        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1142    let extras = hex::decode(details)
1143        .map_err(|e| sql_err!("{e}"))
1144        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1145        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1146    Ok(SqlServerSourceConnection {
1147        connection_id: connection_item.id(),
1148        connection: connection_item.id(),
1149        extras,
1150    })
1151}
1152
1153fn plan_postgres_source_connection(
1154    scx: &StatementContext<'_>,
1155    connection: &ResolvedItemName,
1156    options: &Vec<PgConfigOption<Aug>>,
1157) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1158    let connection_item = scx.get_item_by_resolved_name(connection)?;
1159    let PgConfigOptionExtracted {
1160        details,
1161        publication,
1162        // text columns are already part of the source-exports and are only included
1163        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1164        // be removed once we drop support for implicitly created subsources.
1165        text_columns: _,
1166        // exclude columns are already part of the source-exports and are only included
1167        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1168        // be removed once we drop support for implicitly created subsources.
1169        exclude_columns: _,
1170        seen: _,
1171    } = options.clone().try_into()?;
1172    let details = details
1173        .as_ref()
1174        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1175    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1176    let details =
1177        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1178    let publication_details =
1179        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1180    Ok(PostgresSourceConnection {
1181        connection: connection_item.id(),
1182        connection_id: connection_item.id(),
1183        publication: publication.expect("validated exists during purification"),
1184        publication_details,
1185    })
1186}
1187
1188fn plan_kafka_source_connection(
1189    scx: &StatementContext<'_>,
1190    connection_name: &ResolvedItemName,
1191    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1192    include_metadata: &Vec<SourceIncludeMetadata>,
1193) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1194    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1195    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1196        sql_bail!(
1197            "{} is not a kafka connection",
1198            scx.catalog.resolve_full_name(connection_item.name())
1199        )
1200    }
1201    let KafkaSourceConfigOptionExtracted {
1202        group_id_prefix,
1203        topic,
1204        topic_metadata_refresh_interval,
1205        start_timestamp: _, // purified into `start_offset`
1206        start_offset,
1207        seen: _,
1208    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1209    let topic = topic.expect("validated exists during purification");
1210    let mut start_offsets = BTreeMap::new();
1211    if let Some(offsets) = start_offset {
1212        for (part, offset) in offsets.iter().enumerate() {
1213            if *offset < 0 {
1214                sql_bail!("START OFFSET must be a nonnegative integer");
1215            }
1216            start_offsets.insert(i32::try_from(part)?, *offset);
1217        }
1218    }
1219    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1220        // This is a librdkafka-enforced restriction that, if violated,
1221        // would result in a runtime error for the source.
1222        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1223    }
1224    let metadata_columns = include_metadata
1225        .into_iter()
1226        .flat_map(|item| match item {
1227            SourceIncludeMetadata::Timestamp { alias } => {
1228                let name = match alias {
1229                    Some(name) => name.to_string(),
1230                    None => "timestamp".to_owned(),
1231                };
1232                Some((name, KafkaMetadataKind::Timestamp))
1233            }
1234            SourceIncludeMetadata::Partition { alias } => {
1235                let name = match alias {
1236                    Some(name) => name.to_string(),
1237                    None => "partition".to_owned(),
1238                };
1239                Some((name, KafkaMetadataKind::Partition))
1240            }
1241            SourceIncludeMetadata::Offset { alias } => {
1242                let name = match alias {
1243                    Some(name) => name.to_string(),
1244                    None => "offset".to_owned(),
1245                };
1246                Some((name, KafkaMetadataKind::Offset))
1247            }
1248            SourceIncludeMetadata::Headers { alias } => {
1249                let name = match alias {
1250                    Some(name) => name.to_string(),
1251                    None => "headers".to_owned(),
1252                };
1253                Some((name, KafkaMetadataKind::Headers))
1254            }
1255            SourceIncludeMetadata::Header {
1256                alias,
1257                key,
1258                use_bytes,
1259            } => Some((
1260                alias.to_string(),
1261                KafkaMetadataKind::Header {
1262                    key: key.clone(),
1263                    use_bytes: *use_bytes,
1264                },
1265            )),
1266            SourceIncludeMetadata::Key { .. } => {
1267                // handled below
1268                None
1269            }
1270        })
1271        .collect();
1272    Ok(KafkaSourceConnection {
1273        connection: connection_item.id(),
1274        connection_id: connection_item.id(),
1275        topic,
1276        start_offsets,
1277        group_id_prefix,
1278        topic_metadata_refresh_interval,
1279        metadata_columns,
1280    })
1281}
1282
1283fn apply_source_envelope_encoding(
1284    scx: &StatementContext,
1285    envelope: &ast::SourceEnvelope,
1286    format: &Option<FormatSpecifier<Aug>>,
1287    key_desc: Option<RelationDesc>,
1288    value_desc: RelationDesc,
1289    include_metadata: &[SourceIncludeMetadata],
1290    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1291    source_connection: &GenericSourceConnection<ReferencedConnection>,
1292) -> Result<
1293    (
1294        RelationDesc,
1295        SourceEnvelope,
1296        Option<SourceDataEncoding<ReferencedConnection>>,
1297    ),
1298    PlanError,
1299> {
1300    let encoding = match format {
1301        Some(format) => Some(get_encoding(scx, format, envelope)?),
1302        None => None,
1303    };
1304
1305    let (key_desc, value_desc) = match &encoding {
1306        Some(encoding) => {
1307            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1308            // single column of type bytes.
1309            match value_desc.typ().columns() {
1310                [typ] => match typ.scalar_type {
1311                    SqlScalarType::Bytes => {}
1312                    _ => sql_bail!(
1313                        "The schema produced by the source is incompatible with format decoding"
1314                    ),
1315                },
1316                _ => sql_bail!(
1317                    "The schema produced by the source is incompatible with format decoding"
1318                ),
1319            }
1320
1321            let (key_desc, value_desc) = encoding.desc()?;
1322
1323            // TODO(petrosagg): This piece of code seems to be making a statement about the
1324            // nullability of the NONE envelope when the source is Kafka. As written, the code
1325            // misses opportunities to mark columns as not nullable and is over conservative. For
1326            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1327            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1328            // should be replaced with precise type-level reasoning.
1329            let key_desc = key_desc.map(|desc| {
1330                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1331                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1332                if is_kafka && is_envelope_none {
1333                    RelationDesc::from_names_and_types(
1334                        desc.into_iter()
1335                            .map(|(name, typ)| (name, typ.nullable(true))),
1336                    )
1337                } else {
1338                    desc
1339                }
1340            });
1341            (key_desc, value_desc)
1342        }
1343        None => (key_desc, value_desc),
1344    };
1345
1346    // KEY VALUE load generators are the only UPSERT source that
1347    // has no encoding but defaults to `INCLUDE KEY`.
1348    //
1349    // As discussed
1350    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1351    // removing this special case amounts to deciding how to handle null keys
1352    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1353    // this special case in.
1354    //
1355    // Note that this is safe because this generator is
1356    // 1. The only source with no encoding that can have its key included.
1357    // 2. Never produces null keys (or values, for that matter).
1358    let key_envelope_no_encoding = matches!(
1359        source_connection,
1360        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1361            load_generator: LoadGenerator::KeyValue(_),
1362            ..
1363        })
1364    );
1365    let mut key_envelope = get_key_envelope(
1366        include_metadata,
1367        encoding.as_ref(),
1368        key_envelope_no_encoding,
1369    )?;
1370
1371    match (&envelope, &key_envelope) {
1372        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1373        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1374            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1375        ),
1376        _ => {}
1377    };
1378
1379    // Not all source envelopes are compatible with all source connections.
1380    // Whoever constructs the source ingestion pipeline is responsible for
1381    // choosing compatible envelopes and connections.
1382    //
1383    // TODO(guswynn): ambiguously assert which connections and envelopes are
1384    // compatible in typechecking
1385    //
1386    // TODO: remove bails as more support for upsert is added.
1387    let envelope = match &envelope {
1388        // TODO: fixup key envelope
1389        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1390        ast::SourceEnvelope::Debezium => {
1391            //TODO check that key envelope is not set
1392            let after_idx = match typecheck_debezium(&value_desc) {
1393                Ok((_before_idx, after_idx)) => Ok(after_idx),
1394                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1395                    Some(DataEncoding::Avro(_)) => Err(type_err),
1396                    _ => Err(sql_err!(
1397                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1398                    )),
1399                },
1400            }?;
1401
1402            UnplannedSourceEnvelope::Upsert {
1403                style: UpsertStyle::Debezium { after_idx },
1404            }
1405        }
1406        ast::SourceEnvelope::Upsert {
1407            value_decode_err_policy,
1408        } => {
1409            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1410                None => {
1411                    if !key_envelope_no_encoding {
1412                        bail_unsupported!(format!(
1413                            "UPSERT requires a key/value format: {:?}",
1414                            format
1415                        ))
1416                    }
1417                    None
1418                }
1419                Some(key_encoding) => Some(key_encoding),
1420            };
1421            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1422            // specified.
1423            if key_envelope == KeyEnvelope::None {
1424                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1425            }
1426            // If the value decode error policy is not set we use the default upsert style.
1427            let style = match value_decode_err_policy.as_slice() {
1428                [] => UpsertStyle::Default(key_envelope),
1429                [SourceErrorPolicy::Inline { alias }] => {
1430                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1431                    UpsertStyle::ValueErrInline {
1432                        key_envelope,
1433                        error_column: alias
1434                            .as_ref()
1435                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1436                    }
1437                }
1438                _ => {
1439                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1440                }
1441            };
1442
1443            UnplannedSourceEnvelope::Upsert { style }
1444        }
1445        ast::SourceEnvelope::CdcV2 => {
1446            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1447            //TODO check that key envelope is not set
1448            match format {
1449                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1450                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1451            }
1452            UnplannedSourceEnvelope::CdcV2
1453        }
1454    };
1455
1456    let metadata_desc = included_column_desc(metadata_columns_desc);
1457    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1458
1459    Ok((desc, envelope, encoding))
1460}
1461
1462/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1463/// of columns and constraints.
1464fn plan_source_export_desc(
1465    scx: &StatementContext,
1466    name: &UnresolvedItemName,
1467    columns: &Vec<ColumnDef<Aug>>,
1468    constraints: &Vec<TableConstraint<Aug>>,
1469) -> Result<RelationDesc, PlanError> {
1470    let names: Vec<_> = columns
1471        .iter()
1472        .map(|c| normalize::column_name(c.name.clone()))
1473        .collect();
1474
1475    if let Some(dup) = names.iter().duplicates().next() {
1476        sql_bail!("column {} specified more than once", dup.quoted());
1477    }
1478
1479    // Build initial relation type that handles declared data types
1480    // and NOT NULL constraints.
1481    let mut column_types = Vec::with_capacity(columns.len());
1482    let mut keys = Vec::new();
1483
1484    for (i, c) in columns.into_iter().enumerate() {
1485        let aug_data_type = &c.data_type;
1486        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1487        let mut nullable = true;
1488        for option in &c.options {
1489            match &option.option {
1490                ColumnOption::NotNull => nullable = false,
1491                ColumnOption::Default(_) => {
1492                    bail_unsupported!("Source export with default value")
1493                }
1494                ColumnOption::Unique { is_primary } => {
1495                    keys.push(vec![i]);
1496                    if *is_primary {
1497                        nullable = false;
1498                    }
1499                }
1500                other => {
1501                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1502                }
1503            }
1504        }
1505        column_types.push(ty.nullable(nullable));
1506    }
1507
1508    let mut seen_primary = false;
1509    'c: for constraint in constraints {
1510        match constraint {
1511            TableConstraint::Unique {
1512                name: _,
1513                columns,
1514                is_primary,
1515                nulls_not_distinct,
1516            } => {
1517                if seen_primary && *is_primary {
1518                    sql_bail!(
1519                        "multiple primary keys for source export {} are not allowed",
1520                        name.to_ast_string_stable()
1521                    );
1522                }
1523                seen_primary = *is_primary || seen_primary;
1524
1525                let mut key = vec![];
1526                for column in columns {
1527                    let column = normalize::column_name(column.clone());
1528                    match names.iter().position(|name| *name == column) {
1529                        None => sql_bail!("unknown column in constraint: {}", column),
1530                        Some(i) => {
1531                            let nullable = &mut column_types[i].nullable;
1532                            if *is_primary {
1533                                if *nulls_not_distinct {
1534                                    sql_bail!(
1535                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1536                                    );
1537                                }
1538                                *nullable = false;
1539                            } else if !(*nulls_not_distinct || !*nullable) {
1540                                // Non-primary key unique constraints are only keys if all of their
1541                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1542                                break 'c;
1543                            }
1544
1545                            key.push(i);
1546                        }
1547                    }
1548                }
1549
1550                if *is_primary {
1551                    keys.insert(0, key);
1552                } else {
1553                    keys.push(key);
1554                }
1555            }
1556            TableConstraint::ForeignKey { .. } => {
1557                bail_unsupported!("Source export with a foreign key")
1558            }
1559            TableConstraint::Check { .. } => {
1560                bail_unsupported!("Source export with a check constraint")
1561            }
1562        }
1563    }
1564
1565    let typ = SqlRelationType::new(column_types).with_keys(keys);
1566    let desc = RelationDesc::new(typ, names);
1567    Ok(desc)
1568}
1569
1570generate_extracted_config!(
1571    CreateSubsourceOption,
1572    (Progress, bool, Default(false)),
1573    (ExternalReference, UnresolvedItemName),
1574    (RetainHistory, OptionalDuration),
1575    (TextColumns, Vec::<Ident>, Default(vec![])),
1576    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1577    (Details, String)
1578);
1579
1580pub fn plan_create_subsource(
1581    scx: &StatementContext,
1582    stmt: CreateSubsourceStatement<Aug>,
1583) -> Result<Plan, PlanError> {
1584    let CreateSubsourceStatement {
1585        name,
1586        columns,
1587        of_source,
1588        constraints,
1589        if_not_exists,
1590        with_options,
1591    } = &stmt;
1592
1593    let CreateSubsourceOptionExtracted {
1594        progress,
1595        retain_history,
1596        external_reference,
1597        text_columns,
1598        exclude_columns,
1599        details,
1600        seen: _,
1601    } = with_options.clone().try_into()?;
1602
1603    // This invariant is enforced during purification; we are responsible for
1604    // creating the AST for subsources as a response to CREATE SOURCE
1605    // statements, so this would fire in integration testing if we failed to
1606    // uphold it.
1607    assert!(
1608        progress ^ (external_reference.is_some() && of_source.is_some()),
1609        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1610    );
1611
1612    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1613
1614    let data_source = if let Some(source_reference) = of_source {
1615        // If the new source table syntax is forced we should not be creating any non-progress
1616        // subsources.
1617        if scx.catalog.system_vars().enable_create_table_from_source()
1618            && scx.catalog.system_vars().force_source_table_syntax()
1619        {
1620            Err(PlanError::UseTablesForSources(
1621                "CREATE SUBSOURCE".to_string(),
1622            ))?;
1623        }
1624
1625        // This is a subsource with the "natural" dependency order, i.e. it is
1626        // not a legacy subsource with the inverted structure.
1627        let ingestion_id = *source_reference.item_id();
1628        let external_reference = external_reference.unwrap();
1629
1630        // Decode the details option stored on the subsource statement, which contains information
1631        // created during the purification process.
1632        let details = details
1633            .as_ref()
1634            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1635        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1636        let details =
1637            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1638        let details =
1639            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1640        let details = match details {
1641            SourceExportStatementDetails::Postgres { table } => {
1642                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1643                    column_casts: crate::pure::postgres::generate_column_casts(
1644                        scx,
1645                        &table,
1646                        &text_columns,
1647                    )?,
1648                    table,
1649                })
1650            }
1651            SourceExportStatementDetails::MySql {
1652                table,
1653                initial_gtid_set,
1654            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1655                table,
1656                initial_gtid_set,
1657                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1658                exclude_columns: exclude_columns
1659                    .into_iter()
1660                    .map(|c| c.into_string())
1661                    .collect(),
1662            }),
1663            SourceExportStatementDetails::SqlServer {
1664                table,
1665                capture_instance,
1666                initial_lsn,
1667            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1668                capture_instance,
1669                table,
1670                initial_lsn,
1671                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1672                exclude_columns: exclude_columns
1673                    .into_iter()
1674                    .map(|c| c.into_string())
1675                    .collect(),
1676            }),
1677            SourceExportStatementDetails::LoadGenerator { output } => {
1678                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1679            }
1680            SourceExportStatementDetails::Kafka {} => {
1681                bail_unsupported!("subsources cannot reference Kafka sources")
1682            }
1683        };
1684        DataSourceDesc::IngestionExport {
1685            ingestion_id,
1686            external_reference,
1687            details,
1688            // Subsources don't currently support non-default envelopes / encoding
1689            data_config: SourceExportDataConfig {
1690                envelope: SourceEnvelope::None(NoneEnvelope {
1691                    key_envelope: KeyEnvelope::None,
1692                    key_arity: 0,
1693                }),
1694                encoding: None,
1695            },
1696        }
1697    } else if progress {
1698        DataSourceDesc::Progress
1699    } else {
1700        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1701    };
1702
1703    let if_not_exists = *if_not_exists;
1704    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1705
1706    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1707
1708    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1709    let source = Source {
1710        create_sql,
1711        data_source,
1712        desc,
1713        compaction_window,
1714    };
1715
1716    Ok(Plan::CreateSource(CreateSourcePlan {
1717        name,
1718        source,
1719        if_not_exists,
1720        timeline: Timeline::EpochMilliseconds,
1721        in_cluster: None,
1722    }))
1723}
1724
1725generate_extracted_config!(
1726    TableFromSourceOption,
1727    (TextColumns, Vec::<Ident>, Default(vec![])),
1728    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1729    (PartitionBy, Vec<Ident>),
1730    (RetainHistory, OptionalDuration),
1731    (Details, String)
1732);
1733
1734pub fn plan_create_table_from_source(
1735    scx: &StatementContext,
1736    stmt: CreateTableFromSourceStatement<Aug>,
1737) -> Result<Plan, PlanError> {
1738    if !scx.catalog.system_vars().enable_create_table_from_source() {
1739        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1740    }
1741
1742    let CreateTableFromSourceStatement {
1743        name,
1744        columns,
1745        constraints,
1746        if_not_exists,
1747        source,
1748        external_reference,
1749        envelope,
1750        format,
1751        include_metadata,
1752        with_options,
1753    } = &stmt;
1754
1755    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1756
1757    let TableFromSourceOptionExtracted {
1758        text_columns,
1759        exclude_columns,
1760        retain_history,
1761        partition_by,
1762        details,
1763        seen: _,
1764    } = with_options.clone().try_into()?;
1765
1766    let source_item = scx.get_item_by_resolved_name(source)?;
1767    let ingestion_id = source_item.id();
1768
1769    // Decode the details option stored on the statement, which contains information
1770    // created during the purification process.
1771    let details = details
1772        .as_ref()
1773        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1774    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1775    let details =
1776        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1777    let details =
1778        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1779
1780    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1781        && include_metadata
1782            .iter()
1783            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1784    {
1785        // TODO(guswynn): should this be `bail_unsupported!`?
1786        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1787    }
1788    if !matches!(
1789        details,
1790        SourceExportStatementDetails::Kafka { .. }
1791            | SourceExportStatementDetails::LoadGenerator { .. }
1792    ) && !include_metadata.is_empty()
1793    {
1794        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1795    }
1796
1797    let details = match details {
1798        SourceExportStatementDetails::Postgres { table } => {
1799            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1800                column_casts: crate::pure::postgres::generate_column_casts(
1801                    scx,
1802                    &table,
1803                    &text_columns,
1804                )?,
1805                table,
1806            })
1807        }
1808        SourceExportStatementDetails::MySql {
1809            table,
1810            initial_gtid_set,
1811        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1812            table,
1813            initial_gtid_set,
1814            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1815            exclude_columns: exclude_columns
1816                .into_iter()
1817                .map(|c| c.into_string())
1818                .collect(),
1819        }),
1820        SourceExportStatementDetails::SqlServer {
1821            table,
1822            capture_instance,
1823            initial_lsn,
1824        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1825            table,
1826            capture_instance,
1827            initial_lsn,
1828            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1829            exclude_columns: exclude_columns
1830                .into_iter()
1831                .map(|c| c.into_string())
1832                .collect(),
1833        }),
1834        SourceExportStatementDetails::LoadGenerator { output } => {
1835            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1836        }
1837        SourceExportStatementDetails::Kafka {} => {
1838            if !include_metadata.is_empty()
1839                && !matches!(
1840                    envelope,
1841                    ast::SourceEnvelope::Upsert { .. }
1842                        | ast::SourceEnvelope::None
1843                        | ast::SourceEnvelope::Debezium
1844                )
1845            {
1846                // TODO(guswynn): should this be `bail_unsupported!`?
1847                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1848            }
1849
1850            let metadata_columns = include_metadata
1851                .into_iter()
1852                .flat_map(|item| match item {
1853                    SourceIncludeMetadata::Timestamp { alias } => {
1854                        let name = match alias {
1855                            Some(name) => name.to_string(),
1856                            None => "timestamp".to_owned(),
1857                        };
1858                        Some((name, KafkaMetadataKind::Timestamp))
1859                    }
1860                    SourceIncludeMetadata::Partition { alias } => {
1861                        let name = match alias {
1862                            Some(name) => name.to_string(),
1863                            None => "partition".to_owned(),
1864                        };
1865                        Some((name, KafkaMetadataKind::Partition))
1866                    }
1867                    SourceIncludeMetadata::Offset { alias } => {
1868                        let name = match alias {
1869                            Some(name) => name.to_string(),
1870                            None => "offset".to_owned(),
1871                        };
1872                        Some((name, KafkaMetadataKind::Offset))
1873                    }
1874                    SourceIncludeMetadata::Headers { alias } => {
1875                        let name = match alias {
1876                            Some(name) => name.to_string(),
1877                            None => "headers".to_owned(),
1878                        };
1879                        Some((name, KafkaMetadataKind::Headers))
1880                    }
1881                    SourceIncludeMetadata::Header {
1882                        alias,
1883                        key,
1884                        use_bytes,
1885                    } => Some((
1886                        alias.to_string(),
1887                        KafkaMetadataKind::Header {
1888                            key: key.clone(),
1889                            use_bytes: *use_bytes,
1890                        },
1891                    )),
1892                    SourceIncludeMetadata::Key { .. } => {
1893                        // handled below
1894                        None
1895                    }
1896                })
1897                .collect();
1898
1899            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1900        }
1901    };
1902
1903    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1904
1905    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1906    // during purification and define the `columns` and `constraints` fields for the statement,
1907    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1908    // we use the source connection's default schema.
1909    let (key_desc, value_desc) =
1910        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1911            let columns = match columns {
1912                TableFromSourceColumns::Defined(columns) => columns,
1913                _ => unreachable!(),
1914            };
1915            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1916            (None, desc)
1917        } else {
1918            let key_desc = source_connection.default_key_desc();
1919            let value_desc = source_connection.default_value_desc();
1920            (Some(key_desc), value_desc)
1921        };
1922
1923    let metadata_columns_desc = match &details {
1924        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1925            metadata_columns, ..
1926        }) => kafka_metadata_columns_desc(metadata_columns),
1927        _ => vec![],
1928    };
1929
1930    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1931        scx,
1932        &envelope,
1933        format,
1934        key_desc,
1935        value_desc,
1936        include_metadata,
1937        metadata_columns_desc,
1938        source_connection,
1939    )?;
1940    if let TableFromSourceColumns::Named(col_names) = columns {
1941        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1942    }
1943
1944    let names: Vec<_> = desc.iter_names().cloned().collect();
1945    if let Some(dup) = names.iter().duplicates().next() {
1946        sql_bail!("column {} specified more than once", dup.quoted());
1947    }
1948
1949    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1950
1951    // Allow users to specify a timeline. If they do not, determine a default
1952    // timeline for the source.
1953    let timeline = match envelope {
1954        SourceEnvelope::CdcV2 => {
1955            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1956        }
1957        _ => Timeline::EpochMilliseconds,
1958    };
1959
1960    if let Some(partition_by) = partition_by {
1961        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1962        check_partition_by(&desc, partition_by)?;
1963    }
1964
1965    let data_source = DataSourceDesc::IngestionExport {
1966        ingestion_id,
1967        external_reference: external_reference
1968            .as_ref()
1969            .expect("populated in purification")
1970            .clone(),
1971        details,
1972        data_config: SourceExportDataConfig { envelope, encoding },
1973    };
1974
1975    let if_not_exists = *if_not_exists;
1976
1977    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1978
1979    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1980    let table = Table {
1981        create_sql,
1982        desc: VersionedRelationDesc::new(desc),
1983        temporary: false,
1984        compaction_window,
1985        data_source: TableDataSource::DataSource {
1986            desc: data_source,
1987            timeline,
1988        },
1989    };
1990
1991    Ok(Plan::CreateTable(CreateTablePlan {
1992        name,
1993        table,
1994        if_not_exists,
1995    }))
1996}
1997
1998generate_extracted_config!(
1999    LoadGeneratorOption,
2000    (TickInterval, Duration),
2001    (AsOf, u64, Default(0_u64)),
2002    (UpTo, u64, Default(u64::MAX)),
2003    (ScaleFactor, f64),
2004    (MaxCardinality, u64),
2005    (Keys, u64),
2006    (SnapshotRounds, u64),
2007    (TransactionalSnapshot, bool),
2008    (ValueSize, u64),
2009    (Seed, u64),
2010    (Partitions, u64),
2011    (BatchSize, u64)
2012);
2013
2014impl LoadGeneratorOptionExtracted {
2015    pub(super) fn ensure_only_valid_options(
2016        &self,
2017        loadgen: &ast::LoadGenerator,
2018    ) -> Result<(), PlanError> {
2019        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2020
2021        let mut options = self.seen.clone();
2022
2023        let permitted_options: &[_] = match loadgen {
2024            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2025            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2026            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2027            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2028            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2029            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2030            ast::LoadGenerator::KeyValue => &[
2031                TickInterval,
2032                Keys,
2033                SnapshotRounds,
2034                TransactionalSnapshot,
2035                ValueSize,
2036                Seed,
2037                Partitions,
2038                BatchSize,
2039            ],
2040        };
2041
2042        for o in permitted_options {
2043            options.remove(o);
2044        }
2045
2046        if !options.is_empty() {
2047            sql_bail!(
2048                "{} load generators do not support {} values",
2049                loadgen,
2050                options.iter().join(", ")
2051            )
2052        }
2053
2054        Ok(())
2055    }
2056}
2057
2058pub(crate) fn load_generator_ast_to_generator(
2059    scx: &StatementContext,
2060    loadgen: &ast::LoadGenerator,
2061    options: &[LoadGeneratorOption<Aug>],
2062    include_metadata: &[SourceIncludeMetadata],
2063) -> Result<LoadGenerator, PlanError> {
2064    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2065    extracted.ensure_only_valid_options(loadgen)?;
2066
2067    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2068        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2069    }
2070
2071    let load_generator = match loadgen {
2072        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2073        ast::LoadGenerator::Clock => {
2074            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2075            LoadGenerator::Clock
2076        }
2077        ast::LoadGenerator::Counter => {
2078            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2079            let LoadGeneratorOptionExtracted {
2080                max_cardinality, ..
2081            } = extracted;
2082            LoadGenerator::Counter { max_cardinality }
2083        }
2084        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2085        ast::LoadGenerator::Datums => {
2086            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2087            LoadGenerator::Datums
2088        }
2089        ast::LoadGenerator::Tpch => {
2090            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2091
2092            // Default to 0.01 scale factor (=10MB).
2093            let sf: f64 = scale_factor.unwrap_or(0.01);
2094            if !sf.is_finite() || sf < 0.0 {
2095                sql_bail!("unsupported scale factor {sf}");
2096            }
2097
2098            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2099                let total = (sf * multiplier).floor();
2100                let mut i = i64::try_cast_from(total)
2101                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2102                if i < 1 {
2103                    i = 1;
2104                }
2105                Ok(i)
2106            };
2107
2108            // The multiplications here are safely unchecked because they will
2109            // overflow to infinity, which will be caught by f64_to_i64.
2110            let count_supplier = f_to_i(10_000f64)?;
2111            let count_part = f_to_i(200_000f64)?;
2112            let count_customer = f_to_i(150_000f64)?;
2113            let count_orders = f_to_i(150_000f64 * 10f64)?;
2114            let count_clerk = f_to_i(1_000f64)?;
2115
2116            LoadGenerator::Tpch {
2117                count_supplier,
2118                count_part,
2119                count_customer,
2120                count_orders,
2121                count_clerk,
2122            }
2123        }
2124        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2125            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2126            let LoadGeneratorOptionExtracted {
2127                keys,
2128                snapshot_rounds,
2129                transactional_snapshot,
2130                value_size,
2131                tick_interval,
2132                seed,
2133                partitions,
2134                batch_size,
2135                ..
2136            } = extracted;
2137
2138            let mut include_offset = None;
2139            for im in include_metadata {
2140                match im {
2141                    SourceIncludeMetadata::Offset { alias } => {
2142                        include_offset = match alias {
2143                            Some(alias) => Some(alias.to_string()),
2144                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2145                        }
2146                    }
2147                    SourceIncludeMetadata::Key { .. } => continue,
2148
2149                    _ => {
2150                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2151                    }
2152                };
2153            }
2154
2155            let lgkv = KeyValueLoadGenerator {
2156                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2157                snapshot_rounds: snapshot_rounds
2158                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2159                // Defaults to true.
2160                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2161                value_size: value_size
2162                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2163                partitions: partitions
2164                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2165                tick_interval,
2166                batch_size: batch_size
2167                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2168                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2169                include_offset,
2170            };
2171
2172            if lgkv.keys == 0
2173                || lgkv.partitions == 0
2174                || lgkv.value_size == 0
2175                || lgkv.batch_size == 0
2176            {
2177                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2178            }
2179
2180            if lgkv.keys % lgkv.partitions != 0 {
2181                sql_bail!("KEYS must be a multiple of PARTITIONS")
2182            }
2183
2184            if lgkv.batch_size > lgkv.keys {
2185                sql_bail!("KEYS must be larger than BATCH SIZE")
2186            }
2187
2188            // This constraints simplifies the source implementation.
2189            // We can lift it later.
2190            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2191                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2192            }
2193
2194            if lgkv.snapshot_rounds == 0 {
2195                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2196            }
2197
2198            LoadGenerator::KeyValue(lgkv)
2199        }
2200    };
2201
2202    Ok(load_generator)
2203}
2204
2205fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2206    let before = value_desc.get_by_name(&"before".into());
2207    let (after_idx, after_ty) = value_desc
2208        .get_by_name(&"after".into())
2209        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2210    let before_idx = if let Some((before_idx, before_ty)) = before {
2211        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2212            sql_bail!("'before' column must be of type record");
2213        }
2214        if before_ty != after_ty {
2215            sql_bail!("'before' type differs from 'after' column");
2216        }
2217        Some(before_idx)
2218    } else {
2219        None
2220    };
2221    Ok((before_idx, after_idx))
2222}
2223
2224fn get_encoding(
2225    scx: &StatementContext,
2226    format: &FormatSpecifier<Aug>,
2227    envelope: &ast::SourceEnvelope,
2228) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2229    let encoding = match format {
2230        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2231        FormatSpecifier::KeyValue { key, value } => {
2232            let key = {
2233                let encoding = get_encoding_inner(scx, key)?;
2234                Some(encoding.key.unwrap_or(encoding.value))
2235            };
2236            let value = get_encoding_inner(scx, value)?.value;
2237            SourceDataEncoding { key, value }
2238        }
2239    };
2240
2241    let requires_keyvalue = matches!(
2242        envelope,
2243        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2244    );
2245    let is_keyvalue = encoding.key.is_some();
2246    if requires_keyvalue && !is_keyvalue {
2247        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2248    };
2249
2250    Ok(encoding)
2251}
2252
2253/// Determine the cluster ID to use for this item.
2254///
2255/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2256/// Because of this, do not normalize/canonicalize the create SQL statement
2257/// until after calling this function.
2258fn source_sink_cluster_config<'a, 'ctx>(
2259    scx: &'a StatementContext<'ctx>,
2260    in_cluster: &mut Option<ResolvedClusterName>,
2261) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2262    let cluster = match in_cluster {
2263        None => {
2264            let cluster = scx.catalog.resolve_cluster(None)?;
2265            *in_cluster = Some(ResolvedClusterName {
2266                id: cluster.id(),
2267                print_name: None,
2268            });
2269            cluster
2270        }
2271        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2272    };
2273
2274    Ok(cluster)
2275}
2276
2277generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2278
2279#[derive(Debug)]
2280pub struct Schema {
2281    pub key_schema: Option<String>,
2282    pub value_schema: String,
2283    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2284    pub confluent_wire_format: bool,
2285}
2286
2287fn get_encoding_inner(
2288    scx: &StatementContext,
2289    format: &Format<Aug>,
2290) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2291    let value = match format {
2292        Format::Bytes => DataEncoding::Bytes,
2293        Format::Avro(schema) => {
2294            let Schema {
2295                key_schema,
2296                value_schema,
2297                csr_connection,
2298                confluent_wire_format,
2299            } = match schema {
2300                // TODO(jldlaughlin): we need a way to pass in primary key information
2301                // when building a source from a string or file.
2302                AvroSchema::InlineSchema {
2303                    schema: ast::Schema { schema },
2304                    with_options,
2305                } => {
2306                    let AvroSchemaOptionExtracted {
2307                        confluent_wire_format,
2308                        ..
2309                    } = with_options.clone().try_into()?;
2310
2311                    Schema {
2312                        key_schema: None,
2313                        value_schema: schema.clone(),
2314                        csr_connection: None,
2315                        confluent_wire_format,
2316                    }
2317                }
2318                AvroSchema::Csr {
2319                    csr_connection:
2320                        CsrConnectionAvro {
2321                            connection,
2322                            seed,
2323                            key_strategy: _,
2324                            value_strategy: _,
2325                        },
2326                } => {
2327                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2328                    let csr_connection = match item.connection()? {
2329                        Connection::Csr(_) => item.id(),
2330                        _ => {
2331                            sql_bail!(
2332                                "{} is not a schema registry connection",
2333                                scx.catalog
2334                                    .resolve_full_name(item.name())
2335                                    .to_string()
2336                                    .quoted()
2337                            )
2338                        }
2339                    };
2340
2341                    if let Some(seed) = seed {
2342                        Schema {
2343                            key_schema: seed.key_schema.clone(),
2344                            value_schema: seed.value_schema.clone(),
2345                            csr_connection: Some(csr_connection),
2346                            confluent_wire_format: true,
2347                        }
2348                    } else {
2349                        unreachable!("CSR seed resolution should already have been called: Avro")
2350                    }
2351                }
2352            };
2353
2354            if let Some(key_schema) = key_schema {
2355                return Ok(SourceDataEncoding {
2356                    key: Some(DataEncoding::Avro(AvroEncoding {
2357                        schema: key_schema,
2358                        csr_connection: csr_connection.clone(),
2359                        confluent_wire_format,
2360                    })),
2361                    value: DataEncoding::Avro(AvroEncoding {
2362                        schema: value_schema,
2363                        csr_connection,
2364                        confluent_wire_format,
2365                    }),
2366                });
2367            } else {
2368                DataEncoding::Avro(AvroEncoding {
2369                    schema: value_schema,
2370                    csr_connection,
2371                    confluent_wire_format,
2372                })
2373            }
2374        }
2375        Format::Protobuf(schema) => match schema {
2376            ProtobufSchema::Csr {
2377                csr_connection:
2378                    CsrConnectionProtobuf {
2379                        connection:
2380                            CsrConnection {
2381                                connection,
2382                                options,
2383                            },
2384                        seed,
2385                    },
2386            } => {
2387                if let Some(CsrSeedProtobuf { key, value }) = seed {
2388                    let item = scx.get_item_by_resolved_name(connection)?;
2389                    let _ = match item.connection()? {
2390                        Connection::Csr(connection) => connection,
2391                        _ => {
2392                            sql_bail!(
2393                                "{} is not a schema registry connection",
2394                                scx.catalog
2395                                    .resolve_full_name(item.name())
2396                                    .to_string()
2397                                    .quoted()
2398                            )
2399                        }
2400                    };
2401
2402                    if !options.is_empty() {
2403                        sql_bail!("Protobuf CSR connections do not support any options");
2404                    }
2405
2406                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2407                        descriptors: strconv::parse_bytes(&value.schema)?,
2408                        message_name: value.message_name.clone(),
2409                        confluent_wire_format: true,
2410                    });
2411                    if let Some(key) = key {
2412                        return Ok(SourceDataEncoding {
2413                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2414                                descriptors: strconv::parse_bytes(&key.schema)?,
2415                                message_name: key.message_name.clone(),
2416                                confluent_wire_format: true,
2417                            })),
2418                            value,
2419                        });
2420                    }
2421                    value
2422                } else {
2423                    unreachable!("CSR seed resolution should already have been called: Proto")
2424                }
2425            }
2426            ProtobufSchema::InlineSchema {
2427                message_name,
2428                schema: ast::Schema { schema },
2429            } => {
2430                let descriptors = strconv::parse_bytes(schema)?;
2431
2432                DataEncoding::Protobuf(ProtobufEncoding {
2433                    descriptors,
2434                    message_name: message_name.to_owned(),
2435                    confluent_wire_format: false,
2436                })
2437            }
2438        },
2439        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2440            regex: mz_repr::adt::regex::Regex::new(regex, false)
2441                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2442        }),
2443        Format::Csv { columns, delimiter } => {
2444            let columns = match columns {
2445                CsvColumns::Header { names } => {
2446                    if names.is_empty() {
2447                        sql_bail!("[internal error] column spec should get names in purify")
2448                    }
2449                    ColumnSpec::Header {
2450                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2451                    }
2452                }
2453                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2454            };
2455            DataEncoding::Csv(CsvEncoding {
2456                columns,
2457                delimiter: u8::try_from(*delimiter)
2458                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2459            })
2460        }
2461        Format::Json { array: false } => DataEncoding::Json,
2462        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2463        Format::Text => DataEncoding::Text,
2464    };
2465    Ok(SourceDataEncoding { key: None, value })
2466}
2467
2468/// Extract the key envelope, if it is requested
2469fn get_key_envelope(
2470    included_items: &[SourceIncludeMetadata],
2471    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2472    key_envelope_no_encoding: bool,
2473) -> Result<KeyEnvelope, PlanError> {
2474    let key_definition = included_items
2475        .iter()
2476        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2477    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2478        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2479            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2480            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2481            (Some(name), _) if key_envelope_no_encoding => {
2482                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2483            }
2484            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2485            (_, None) => {
2486                // `kd.alias` == `None` means `INCLUDE KEY`
2487                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2488                // These both make sense with the same error message
2489                sql_bail!(
2490                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2491                        got bare FORMAT"
2492                );
2493            }
2494        }
2495    } else {
2496        Ok(KeyEnvelope::None)
2497    }
2498}
2499
2500/// Gets the key envelope for a given key encoding when no name for the key has
2501/// been requested by the user.
2502fn get_unnamed_key_envelope(
2503    key: Option<&DataEncoding<ReferencedConnection>>,
2504) -> Result<KeyEnvelope, PlanError> {
2505    // If the key is requested but comes from an unnamed type then it gets the name "key"
2506    //
2507    // Otherwise it gets the names of the columns in the type
2508    let is_composite = match key {
2509        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2510        Some(
2511            DataEncoding::Avro(_)
2512            | DataEncoding::Csv(_)
2513            | DataEncoding::Protobuf(_)
2514            | DataEncoding::Regex { .. },
2515        ) => true,
2516        None => false,
2517    };
2518
2519    if is_composite {
2520        Ok(KeyEnvelope::Flattened)
2521    } else {
2522        Ok(KeyEnvelope::Named("key".to_string()))
2523    }
2524}
2525
2526pub fn describe_create_view(
2527    _: &StatementContext,
2528    _: CreateViewStatement<Aug>,
2529) -> Result<StatementDesc, PlanError> {
2530    Ok(StatementDesc::new(None))
2531}
2532
2533pub fn plan_view(
2534    scx: &StatementContext,
2535    def: &mut ViewDefinition<Aug>,
2536    temporary: bool,
2537) -> Result<(QualifiedItemName, View), PlanError> {
2538    let create_sql = normalize::create_statement(
2539        scx,
2540        Statement::CreateView(CreateViewStatement {
2541            if_exists: IfExistsBehavior::Error,
2542            temporary,
2543            definition: def.clone(),
2544        }),
2545    )?;
2546
2547    let ViewDefinition {
2548        name,
2549        columns,
2550        query,
2551    } = def;
2552
2553    let query::PlannedRootQuery {
2554        expr,
2555        mut desc,
2556        finishing,
2557        scope: _,
2558    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2559    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2560    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2561    // here to help with database-issues#236. However, in the meantime, there might be a better
2562    // approach to solve database-issues#236:
2563    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2564    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2565        &finishing,
2566        expr.arity()
2567    ));
2568    if expr.contains_parameters()? {
2569        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2570    }
2571
2572    let dependencies = expr
2573        .depends_on()
2574        .into_iter()
2575        .map(|gid| scx.catalog.resolve_item_id(&gid))
2576        .collect();
2577
2578    let name = if temporary {
2579        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2580    } else {
2581        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2582    };
2583
2584    plan_utils::maybe_rename_columns(
2585        format!("view {}", scx.catalog.resolve_full_name(&name)),
2586        &mut desc,
2587        columns,
2588    )?;
2589    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2590
2591    if let Some(dup) = names.iter().duplicates().next() {
2592        sql_bail!("column {} specified more than once", dup.quoted());
2593    }
2594
2595    let view = View {
2596        create_sql,
2597        expr,
2598        dependencies,
2599        column_names: names,
2600        temporary,
2601    };
2602
2603    Ok((name, view))
2604}
2605
2606pub fn plan_create_view(
2607    scx: &StatementContext,
2608    mut stmt: CreateViewStatement<Aug>,
2609) -> Result<Plan, PlanError> {
2610    let CreateViewStatement {
2611        temporary,
2612        if_exists,
2613        definition,
2614    } = &mut stmt;
2615    let (name, view) = plan_view(scx, definition, *temporary)?;
2616
2617    // Override the statement-level IfExistsBehavior with Skip if this is
2618    // explicitly requested in the PlanContext (the default is `false`).
2619    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2620
2621    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2622        let if_exists = true;
2623        let cascade = false;
2624        let maybe_item_to_drop = plan_drop_item(
2625            scx,
2626            ObjectType::View,
2627            if_exists,
2628            definition.name.clone(),
2629            cascade,
2630        )?;
2631
2632        // Check if the new View depends on the item that we would be replacing.
2633        if let Some(id) = maybe_item_to_drop {
2634            let dependencies = view.expr.depends_on();
2635            let invalid_drop = scx
2636                .get_item(&id)
2637                .global_ids()
2638                .any(|gid| dependencies.contains(&gid));
2639            if invalid_drop {
2640                let item = scx.catalog.get_item(&id);
2641                sql_bail!(
2642                    "cannot replace view {0}: depended upon by new {0} definition",
2643                    scx.catalog.resolve_full_name(item.name())
2644                );
2645            }
2646
2647            Some(id)
2648        } else {
2649            None
2650        }
2651    } else {
2652        None
2653    };
2654    let drop_ids = replace
2655        .map(|id| {
2656            scx.catalog
2657                .item_dependents(id)
2658                .into_iter()
2659                .map(|id| id.unwrap_item_id())
2660                .collect()
2661        })
2662        .unwrap_or_default();
2663
2664    // Check for an object in the catalog with this same name
2665    let full_name = scx.catalog.resolve_full_name(&name);
2666    let partial_name = PartialItemName::from(full_name.clone());
2667    // For PostgreSQL compatibility, we need to prevent creating views when
2668    // there is an existing object *or* type of the same name.
2669    if let (Ok(item), IfExistsBehavior::Error, false) = (
2670        scx.catalog.resolve_item_or_type(&partial_name),
2671        *if_exists,
2672        ignore_if_exists_errors,
2673    ) {
2674        return Err(PlanError::ItemAlreadyExists {
2675            name: full_name.to_string(),
2676            item_type: item.item_type(),
2677        });
2678    }
2679
2680    Ok(Plan::CreateView(CreateViewPlan {
2681        name,
2682        view,
2683        replace,
2684        drop_ids,
2685        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2686        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2687    }))
2688}
2689
2690pub fn describe_create_materialized_view(
2691    _: &StatementContext,
2692    _: CreateMaterializedViewStatement<Aug>,
2693) -> Result<StatementDesc, PlanError> {
2694    Ok(StatementDesc::new(None))
2695}
2696
2697pub fn describe_create_continual_task(
2698    _: &StatementContext,
2699    _: CreateContinualTaskStatement<Aug>,
2700) -> Result<StatementDesc, PlanError> {
2701    Ok(StatementDesc::new(None))
2702}
2703
2704pub fn describe_create_network_policy(
2705    _: &StatementContext,
2706    _: CreateNetworkPolicyStatement<Aug>,
2707) -> Result<StatementDesc, PlanError> {
2708    Ok(StatementDesc::new(None))
2709}
2710
2711pub fn describe_alter_network_policy(
2712    _: &StatementContext,
2713    _: AlterNetworkPolicyStatement<Aug>,
2714) -> Result<StatementDesc, PlanError> {
2715    Ok(StatementDesc::new(None))
2716}
2717
2718pub fn plan_create_materialized_view(
2719    scx: &StatementContext,
2720    mut stmt: CreateMaterializedViewStatement<Aug>,
2721) -> Result<Plan, PlanError> {
2722    let cluster_id =
2723        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2724    stmt.in_cluster = Some(ResolvedClusterName {
2725        id: cluster_id,
2726        print_name: None,
2727    });
2728
2729    let create_sql =
2730        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2731
2732    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2733    let name = scx.allocate_qualified_name(partial_name.clone())?;
2734
2735    let query::PlannedRootQuery {
2736        expr,
2737        mut desc,
2738        finishing,
2739        scope: _,
2740    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2741    // We get back a trivial finishing, see comment in `plan_view`.
2742    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2743        &finishing,
2744        expr.arity()
2745    ));
2746    if expr.contains_parameters()? {
2747        return Err(PlanError::ParameterNotAllowed(
2748            "materialized views".to_string(),
2749        ));
2750    }
2751
2752    plan_utils::maybe_rename_columns(
2753        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2754        &mut desc,
2755        &stmt.columns,
2756    )?;
2757    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2758
2759    let MaterializedViewOptionExtracted {
2760        assert_not_null,
2761        partition_by,
2762        retain_history,
2763        refresh,
2764        seen: _,
2765    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2766
2767    if let Some(partition_by) = partition_by {
2768        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2769        check_partition_by(&desc, partition_by)?;
2770    }
2771
2772    let refresh_schedule = {
2773        let mut refresh_schedule = RefreshSchedule::default();
2774        let mut on_commits_seen = 0;
2775        for refresh_option_value in refresh {
2776            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2777                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2778            }
2779            match refresh_option_value {
2780                RefreshOptionValue::OnCommit => {
2781                    on_commits_seen += 1;
2782                }
2783                RefreshOptionValue::AtCreation => {
2784                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2785                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2786                }
2787                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2788                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2789                    let ecx = &ExprContext {
2790                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2791                        name: "REFRESH AT",
2792                        scope: &Scope::empty(),
2793                        relation_type: &SqlRelationType::empty(),
2794                        allow_aggregates: false,
2795                        allow_subqueries: false,
2796                        allow_parameters: false,
2797                        allow_windows: false,
2798                    };
2799                    let hir = plan_expr(ecx, &time)?.cast_to(
2800                        ecx,
2801                        CastContext::Assignment,
2802                        &SqlScalarType::MzTimestamp,
2803                    )?;
2804                    // (mz_now was purified away to a literal earlier)
2805                    let timestamp = hir
2806                        .into_literal_mz_timestamp()
2807                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2808                    refresh_schedule.ats.push(timestamp);
2809                }
2810                RefreshOptionValue::Every(RefreshEveryOptionValue {
2811                    interval,
2812                    aligned_to,
2813                }) => {
2814                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2815                    if interval.as_microseconds() <= 0 {
2816                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2817                    }
2818                    if interval.months != 0 {
2819                        // This limitation is because we want Intervals to be cleanly convertable
2820                        // to a unix epoch timestamp difference. When the interval involves months, then
2821                        // this is not true anymore, because months have variable lengths.
2822                        // See `Timestamp::round_up`.
2823                        sql_bail!("REFRESH interval must not involve units larger than days");
2824                    }
2825                    let interval = interval.duration()?;
2826                    if u64::try_from(interval.as_millis()).is_err() {
2827                        sql_bail!("REFRESH interval too large");
2828                    }
2829
2830                    let mut aligned_to = match aligned_to {
2831                        Some(aligned_to) => aligned_to,
2832                        None => {
2833                            soft_panic_or_log!(
2834                                "ALIGNED TO should have been filled in by purification"
2835                            );
2836                            sql_bail!(
2837                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2838                            )
2839                        }
2840                    };
2841
2842                    // Desugar the `aligned_to` expression
2843                    transform_ast::transform(scx, &mut aligned_to)?;
2844
2845                    let ecx = &ExprContext {
2846                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2847                        name: "REFRESH EVERY ... ALIGNED TO",
2848                        scope: &Scope::empty(),
2849                        relation_type: &SqlRelationType::empty(),
2850                        allow_aggregates: false,
2851                        allow_subqueries: false,
2852                        allow_parameters: false,
2853                        allow_windows: false,
2854                    };
2855                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2856                        ecx,
2857                        CastContext::Assignment,
2858                        &SqlScalarType::MzTimestamp,
2859                    )?;
2860                    // (mz_now was purified away to a literal earlier)
2861                    let aligned_to_const = aligned_to_hir
2862                        .into_literal_mz_timestamp()
2863                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2864
2865                    refresh_schedule.everies.push(RefreshEvery {
2866                        interval,
2867                        aligned_to: aligned_to_const,
2868                    });
2869                }
2870            }
2871        }
2872
2873        if on_commits_seen > 1 {
2874            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2875        }
2876        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2877            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2878        }
2879
2880        if refresh_schedule == RefreshSchedule::default() {
2881            None
2882        } else {
2883            Some(refresh_schedule)
2884        }
2885    };
2886
2887    let as_of = stmt.as_of.map(Timestamp::from);
2888    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2889    let mut non_null_assertions = assert_not_null
2890        .into_iter()
2891        .map(normalize::column_name)
2892        .map(|assertion_name| {
2893            column_names
2894                .iter()
2895                .position(|col| col == &assertion_name)
2896                .ok_or_else(|| {
2897                    sql_err!(
2898                        "column {} in ASSERT NOT NULL option not found",
2899                        assertion_name.quoted()
2900                    )
2901                })
2902        })
2903        .collect::<Result<Vec<_>, _>>()?;
2904    non_null_assertions.sort();
2905    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2906        let dup = &column_names[*dup];
2907        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2908    }
2909
2910    if let Some(dup) = column_names.iter().duplicates().next() {
2911        sql_bail!("column {} specified more than once", dup.quoted());
2912    }
2913
2914    // Override the statement-level IfExistsBehavior with Skip if this is
2915    // explicitly requested in the PlanContext (the default is `false`).
2916    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2917        Ok(true) => IfExistsBehavior::Skip,
2918        _ => stmt.if_exists,
2919    };
2920
2921    let mut replace = None;
2922    let mut if_not_exists = false;
2923    match if_exists {
2924        IfExistsBehavior::Replace => {
2925            let if_exists = true;
2926            let cascade = false;
2927            let replace_id = plan_drop_item(
2928                scx,
2929                ObjectType::MaterializedView,
2930                if_exists,
2931                partial_name.into(),
2932                cascade,
2933            )?;
2934
2935            // Check if the new Materialized View depends on the item that we would be replacing.
2936            if let Some(id) = replace_id {
2937                let dependencies = expr.depends_on();
2938                let invalid_drop = scx
2939                    .get_item(&id)
2940                    .global_ids()
2941                    .any(|gid| dependencies.contains(&gid));
2942                if invalid_drop {
2943                    let item = scx.catalog.get_item(&id);
2944                    sql_bail!(
2945                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2946                        scx.catalog.resolve_full_name(item.name())
2947                    );
2948                }
2949                replace = Some(id);
2950            }
2951        }
2952        IfExistsBehavior::Skip => if_not_exists = true,
2953        IfExistsBehavior::Error => (),
2954    }
2955    let drop_ids = replace
2956        .map(|id| {
2957            scx.catalog
2958                .item_dependents(id)
2959                .into_iter()
2960                .map(|id| id.unwrap_item_id())
2961                .collect()
2962        })
2963        .unwrap_or_default();
2964    let dependencies = expr
2965        .depends_on()
2966        .into_iter()
2967        .map(|gid| scx.catalog.resolve_item_id(&gid))
2968        .collect();
2969
2970    // Check for an object in the catalog with this same name
2971    let full_name = scx.catalog.resolve_full_name(&name);
2972    let partial_name = PartialItemName::from(full_name.clone());
2973    // For PostgreSQL compatibility, we need to prevent creating materialized
2974    // views when there is an existing object *or* type of the same name.
2975    if let (IfExistsBehavior::Error, Ok(item)) =
2976        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2977    {
2978        return Err(PlanError::ItemAlreadyExists {
2979            name: full_name.to_string(),
2980            item_type: item.item_type(),
2981        });
2982    }
2983
2984    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2985        name,
2986        materialized_view: MaterializedView {
2987            create_sql,
2988            expr,
2989            dependencies,
2990            column_names,
2991            cluster_id,
2992            non_null_assertions,
2993            compaction_window,
2994            refresh_schedule,
2995            as_of,
2996        },
2997        replace,
2998        drop_ids,
2999        if_not_exists,
3000        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3001    }))
3002}
3003
3004generate_extracted_config!(
3005    MaterializedViewOption,
3006    (AssertNotNull, Ident, AllowMultiple),
3007    (PartitionBy, Vec<Ident>),
3008    (RetainHistory, OptionalDuration),
3009    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3010);
3011
3012pub fn plan_create_continual_task(
3013    scx: &StatementContext,
3014    mut stmt: CreateContinualTaskStatement<Aug>,
3015) -> Result<Plan, PlanError> {
3016    match &stmt.sugar {
3017        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3018        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3019            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3020        }
3021        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3022            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3023        }
3024    };
3025    let cluster_id = match &stmt.in_cluster {
3026        None => scx.catalog.resolve_cluster(None)?.id(),
3027        Some(in_cluster) => in_cluster.id,
3028    };
3029    stmt.in_cluster = Some(ResolvedClusterName {
3030        id: cluster_id,
3031        print_name: None,
3032    });
3033
3034    let create_sql =
3035        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3036
3037    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3038
3039    // It seems desirable for a CT that e.g. simply filters the input to keep
3040    // the same nullability. So, start by assuming all columns are non-nullable,
3041    // and then make them nullable below if any of the exprs plan them as
3042    // nullable.
3043    let mut desc = match stmt.columns {
3044        None => None,
3045        Some(columns) => {
3046            let mut desc_columns = Vec::with_capacity(columns.capacity());
3047            for col in columns.iter() {
3048                desc_columns.push((
3049                    normalize::column_name(col.name.clone()),
3050                    SqlColumnType {
3051                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3052                        nullable: false,
3053                    },
3054                ));
3055            }
3056            Some(RelationDesc::from_names_and_types(desc_columns))
3057        }
3058    };
3059    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3060    match input.item_type() {
3061        // Input must be a thing directly backed by a persist shard, so we can
3062        // use a persist listen to efficiently rehydrate.
3063        CatalogItemType::ContinualTask
3064        | CatalogItemType::Table
3065        | CatalogItemType::MaterializedView
3066        | CatalogItemType::Source => {}
3067        CatalogItemType::Sink
3068        | CatalogItemType::View
3069        | CatalogItemType::Index
3070        | CatalogItemType::Type
3071        | CatalogItemType::Func
3072        | CatalogItemType::Secret
3073        | CatalogItemType::Connection => {
3074            sql_bail!(
3075                "CONTINUAL TASK cannot use {} as an input",
3076                input.item_type()
3077            );
3078        }
3079    }
3080
3081    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3082    let ct_name = stmt.name;
3083    let placeholder_id = match &ct_name {
3084        ResolvedItemName::ContinualTask { id, name } => {
3085            let desc = match desc.as_ref().cloned() {
3086                Some(x) => x,
3087                None => {
3088                    // The user didn't specify the CT's columns. Take a wild
3089                    // guess that the CT has the same shape as the input. It's
3090                    // fine if this is wrong, we'll get an error below after
3091                    // planning the query.
3092                    let input_name = scx.catalog.resolve_full_name(input.name());
3093                    input.desc(&input_name)?.into_owned()
3094                }
3095            };
3096            qcx.ctes.insert(
3097                *id,
3098                CteDesc {
3099                    name: name.item.clone(),
3100                    desc,
3101                },
3102            );
3103            Some(*id)
3104        }
3105        _ => None,
3106    };
3107
3108    let mut exprs = Vec::new();
3109    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3110        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3111        let query::PlannedRootQuery {
3112            expr,
3113            desc: desc_query,
3114            finishing,
3115            scope: _,
3116        } = query::plan_ct_query(&mut qcx, query)?;
3117        // We get back a trivial finishing because we plan with a "maintained"
3118        // QueryLifetime, see comment in `plan_view`.
3119        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3120            &finishing,
3121            expr.arity()
3122        ));
3123        if expr.contains_parameters()? {
3124            if expr.contains_parameters()? {
3125                return Err(PlanError::ParameterNotAllowed(
3126                    "continual tasks".to_string(),
3127                ));
3128            }
3129        }
3130        let expr = match desc.as_mut() {
3131            None => {
3132                desc = Some(desc_query);
3133                expr
3134            }
3135            Some(desc) => {
3136                // We specify the columns for DELETE, so if any columns types don't
3137                // match, it's because it's an INSERT.
3138                if desc_query.arity() > desc.arity() {
3139                    sql_bail!(
3140                        "statement {}: INSERT has more expressions than target columns",
3141                        idx
3142                    );
3143                }
3144                if desc_query.arity() < desc.arity() {
3145                    sql_bail!(
3146                        "statement {}: INSERT has more target columns than expressions",
3147                        idx
3148                    );
3149                }
3150                // Ensure the types of the source query match the types of the target table,
3151                // installing assignment casts where necessary and possible.
3152                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3153                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3154                let expr = expr.map_err(|e| {
3155                    sql_err!(
3156                        "statement {}: column {} is of type {} but expression is of type {}",
3157                        idx,
3158                        desc.get_name(e.column).quoted(),
3159                        qcx.humanize_scalar_type(&e.target_type, false),
3160                        qcx.humanize_scalar_type(&e.source_type, false),
3161                    )
3162                })?;
3163
3164                // Update ct nullability as necessary. The `ne` above verified that the
3165                // types are the same len.
3166                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3167                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3168                if updated {
3169                    let new_types = zip_types().map(|(ct, q)| {
3170                        let mut ct = ct.clone();
3171                        if q.nullable {
3172                            ct.nullable = true;
3173                        }
3174                        ct
3175                    });
3176                    *desc = RelationDesc::from_names_and_types(
3177                        desc.iter_names().cloned().zip_eq(new_types),
3178                    );
3179                }
3180
3181                expr
3182            }
3183        };
3184        match stmt {
3185            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3186            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3187        }
3188    }
3189    // TODO(ct3): Collect things by output and assert that there is only one (or
3190    // support multiple outputs).
3191    let expr = exprs
3192        .into_iter()
3193        .reduce(|acc, expr| acc.union(expr))
3194        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3195    let dependencies = expr
3196        .depends_on()
3197        .into_iter()
3198        .map(|gid| scx.catalog.resolve_item_id(&gid))
3199        .collect();
3200
3201    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3202    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3203    if let Some(dup) = column_names.iter().duplicates().next() {
3204        sql_bail!("column {} specified more than once", dup.quoted());
3205    }
3206
3207    // Check for an object in the catalog with this same name
3208    let name = match &ct_name {
3209        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3210        ResolvedItemName::ContinualTask { name, .. } => {
3211            let name = scx.allocate_qualified_name(name.clone())?;
3212            let full_name = scx.catalog.resolve_full_name(&name);
3213            let partial_name = PartialItemName::from(full_name.clone());
3214            // For PostgreSQL compatibility, we need to prevent creating this when there
3215            // is an existing object *or* type of the same name.
3216            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3217                return Err(PlanError::ItemAlreadyExists {
3218                    name: full_name.to_string(),
3219                    item_type: item.item_type(),
3220                });
3221            }
3222            name
3223        }
3224        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3225        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3226    };
3227
3228    let as_of = stmt.as_of.map(Timestamp::from);
3229    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3230        name,
3231        placeholder_id,
3232        desc,
3233        input_id: input.global_id(),
3234        with_snapshot: snapshot.unwrap_or(true),
3235        continual_task: MaterializedView {
3236            create_sql,
3237            expr,
3238            dependencies,
3239            column_names,
3240            cluster_id,
3241            non_null_assertions: Vec::new(),
3242            compaction_window: None,
3243            refresh_schedule: None,
3244            as_of,
3245        },
3246    }))
3247}
3248
3249fn continual_task_query<'a>(
3250    ct_name: &ResolvedItemName,
3251    stmt: &'a ast::ContinualTaskStmt<Aug>,
3252) -> Option<ast::Query<Aug>> {
3253    match stmt {
3254        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3255            table_name: _,
3256            columns,
3257            source,
3258            returning,
3259        }) => {
3260            if !columns.is_empty() || !returning.is_empty() {
3261                return None;
3262            }
3263            match source {
3264                ast::InsertSource::Query(query) => Some(query.clone()),
3265                ast::InsertSource::DefaultValues => None,
3266            }
3267        }
3268        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3269            table_name: _,
3270            alias,
3271            using,
3272            selection,
3273        }) => {
3274            if !using.is_empty() {
3275                return None;
3276            }
3277            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3278            let from = ast::TableWithJoins {
3279                relation: ast::TableFactor::Table {
3280                    name: ct_name.clone(),
3281                    alias: alias.clone(),
3282                },
3283                joins: Vec::new(),
3284            };
3285            let select = ast::Select {
3286                from: vec![from],
3287                selection: selection.clone(),
3288                distinct: None,
3289                projection: vec![ast::SelectItem::Wildcard],
3290                group_by: Vec::new(),
3291                having: None,
3292                qualify: None,
3293                options: Vec::new(),
3294            };
3295            let query = ast::Query {
3296                ctes: ast::CteBlock::Simple(Vec::new()),
3297                body: ast::SetExpr::Select(Box::new(select)),
3298                order_by: Vec::new(),
3299                limit: None,
3300                offset: None,
3301            };
3302            // Then negate it to turn it into retractions (after planning it).
3303            Some(query)
3304        }
3305    }
3306}
3307
3308generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3309
3310pub fn describe_create_sink(
3311    _: &StatementContext,
3312    _: CreateSinkStatement<Aug>,
3313) -> Result<StatementDesc, PlanError> {
3314    Ok(StatementDesc::new(None))
3315}
3316
3317generate_extracted_config!(
3318    CreateSinkOption,
3319    (Snapshot, bool),
3320    (PartitionStrategy, String),
3321    (Version, u64),
3322    (CommitInterval, Duration)
3323);
3324
3325pub fn plan_create_sink(
3326    scx: &StatementContext,
3327    stmt: CreateSinkStatement<Aug>,
3328) -> Result<Plan, PlanError> {
3329    // Check for an object in the catalog with this same name
3330    let Some(name) = stmt.name.clone() else {
3331        return Err(PlanError::MissingName(CatalogItemType::Sink));
3332    };
3333    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3334    let full_name = scx.catalog.resolve_full_name(&name);
3335    let partial_name = PartialItemName::from(full_name.clone());
3336    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3337        return Err(PlanError::ItemAlreadyExists {
3338            name: full_name.to_string(),
3339            item_type: item.item_type(),
3340        });
3341    }
3342
3343    plan_sink(scx, stmt)
3344}
3345
3346/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3347/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3348/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3349/// important.
3350fn plan_sink(
3351    scx: &StatementContext,
3352    mut stmt: CreateSinkStatement<Aug>,
3353) -> Result<Plan, PlanError> {
3354    let CreateSinkStatement {
3355        name,
3356        in_cluster: _,
3357        from,
3358        connection,
3359        format,
3360        envelope,
3361        if_not_exists,
3362        with_options,
3363    } = stmt.clone();
3364
3365    let Some(name) = name else {
3366        return Err(PlanError::MissingName(CatalogItemType::Sink));
3367    };
3368    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3369
3370    let envelope = match envelope {
3371        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3372        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3373        None => sql_bail!("ENVELOPE clause is required"),
3374    };
3375
3376    let from_name = &from;
3377    let from = scx.get_item_by_resolved_name(&from)?;
3378    if from.id().is_system() {
3379        bail_unsupported!("creating a sink directly on a catalog object");
3380    }
3381    let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3382    let key_indices = match &connection {
3383        CreateSinkConnection::Kafka { key: Some(key), .. }
3384        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3385            let key_columns = key
3386                .key_columns
3387                .clone()
3388                .into_iter()
3389                .map(normalize::column_name)
3390                .collect::<Vec<_>>();
3391            let mut uniq = BTreeSet::new();
3392            for col in key_columns.iter() {
3393                if !uniq.insert(col) {
3394                    sql_bail!("duplicate column referenced in KEY: {}", col);
3395                }
3396            }
3397            let indices = key_columns
3398                .iter()
3399                .map(|col| -> anyhow::Result<usize> {
3400                    let name_idx =
3401                        desc.get_by_name(col)
3402                            .map(|(idx, _type)| idx)
3403                            .ok_or_else(|| {
3404                                sql_err!("column referenced in KEY does not exist: {}", col)
3405                            })?;
3406                    if desc.get_unambiguous_name(name_idx).is_none() {
3407                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3408                    }
3409                    Ok(name_idx)
3410                })
3411                .collect::<Result<Vec<_>, _>>()?;
3412            let is_valid_key = desc
3413                .typ()
3414                .keys
3415                .iter()
3416                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3417
3418            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3419                if key.not_enforced {
3420                    scx.catalog
3421                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3422                            key: key_columns.clone(),
3423                            name: name.item.clone(),
3424                        })
3425                } else {
3426                    return Err(PlanError::UpsertSinkWithInvalidKey {
3427                        name: from_name.full_name_str(),
3428                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3429                        valid_keys: desc
3430                            .typ()
3431                            .keys
3432                            .iter()
3433                            .map(|key| {
3434                                key.iter()
3435                                    .map(|col| desc.get_name(*col).as_str().into())
3436                                    .collect()
3437                            })
3438                            .collect(),
3439                    });
3440                }
3441            }
3442            Some(indices)
3443        }
3444        CreateSinkConnection::Kafka { key: None, .. }
3445        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3446    };
3447
3448    let headers_index = match &connection {
3449        CreateSinkConnection::Kafka {
3450            headers: Some(headers),
3451            ..
3452        } => {
3453            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3454
3455            match envelope {
3456                SinkEnvelope::Upsert => (),
3457                SinkEnvelope::Debezium => {
3458                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3459                }
3460            };
3461
3462            let headers = normalize::column_name(headers.clone());
3463            let (idx, ty) = desc
3464                .get_by_name(&headers)
3465                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3466
3467            if desc.get_unambiguous_name(idx).is_none() {
3468                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3469            }
3470
3471            match &ty.scalar_type {
3472                SqlScalarType::Map { value_type, .. }
3473                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3474                _ => sql_bail!(
3475                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3476                ),
3477            }
3478
3479            Some(idx)
3480        }
3481        _ => None,
3482    };
3483
3484    // pick the first valid natural relation key, if any
3485    let relation_key_indices = desc.typ().keys.get(0).cloned();
3486
3487    let key_desc_and_indices = key_indices.map(|key_indices| {
3488        let cols = desc
3489            .iter()
3490            .map(|(name, ty)| (name.clone(), ty.clone()))
3491            .collect::<Vec<_>>();
3492        let (names, types): (Vec<_>, Vec<_>) =
3493            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3494        let typ = SqlRelationType::new(types);
3495        (RelationDesc::new(typ, names), key_indices)
3496    });
3497
3498    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3499        return Err(PlanError::UpsertSinkWithoutKey);
3500    }
3501
3502    let connection_builder = match connection {
3503        CreateSinkConnection::Kafka {
3504            connection,
3505            options,
3506            ..
3507        } => kafka_sink_builder(
3508            scx,
3509            connection,
3510            options,
3511            format,
3512            relation_key_indices,
3513            key_desc_and_indices,
3514            headers_index,
3515            desc.into_owned(),
3516            envelope,
3517            from.id(),
3518        )?,
3519        CreateSinkConnection::Iceberg {
3520            connection,
3521            aws_connection,
3522            options,
3523            ..
3524        } => iceberg_sink_builder(
3525            scx,
3526            connection,
3527            aws_connection,
3528            options,
3529            relation_key_indices,
3530            key_desc_and_indices,
3531        )?,
3532    };
3533
3534    let CreateSinkOptionExtracted {
3535        snapshot,
3536        version,
3537        partition_strategy: _,
3538        seen: _,
3539        commit_interval: _,
3540    } = with_options.try_into()?;
3541
3542    // WITH SNAPSHOT defaults to true
3543    let with_snapshot = snapshot.unwrap_or(true);
3544    // VERSION defaults to 0
3545    let version = version.unwrap_or(0);
3546
3547    // We will rewrite the cluster if one is not provided, so we must use the
3548    // `in_cluster` value we plan to normalize when we canonicalize the create
3549    // statement.
3550    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3551    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3552
3553    Ok(Plan::CreateSink(CreateSinkPlan {
3554        name,
3555        sink: Sink {
3556            create_sql,
3557            from: from.global_id(),
3558            connection: connection_builder,
3559            envelope,
3560            version,
3561        },
3562        with_snapshot,
3563        if_not_exists,
3564        in_cluster: in_cluster.id(),
3565    }))
3566}
3567
3568fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3569    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3570
3571    let existing_keys = desc
3572        .typ()
3573        .keys
3574        .iter()
3575        .map(|key_columns| {
3576            key_columns
3577                .iter()
3578                .map(|col| desc.get_name(*col).as_str())
3579                .join(", ")
3580        })
3581        .join(", ");
3582
3583    sql_err!(
3584        "Key constraint ({}) conflicts with existing key ({})",
3585        user_keys,
3586        existing_keys
3587    )
3588}
3589
3590/// Creating this by hand instead of using generate_extracted_config! macro
3591/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3592#[derive(Debug, Default, PartialEq, Clone)]
3593pub struct CsrConfigOptionExtracted {
3594    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3595    pub(crate) avro_key_fullname: Option<String>,
3596    pub(crate) avro_value_fullname: Option<String>,
3597    pub(crate) null_defaults: bool,
3598    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3599    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3600    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3601    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3602}
3603
3604impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3605    type Error = crate::plan::PlanError;
3606    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3607        let mut extracted = CsrConfigOptionExtracted::default();
3608        let mut common_doc_comments = BTreeMap::new();
3609        for option in v {
3610            if !extracted.seen.insert(option.name.clone()) {
3611                return Err(PlanError::Unstructured({
3612                    format!("{} specified more than once", option.name)
3613                }));
3614            }
3615            let option_name = option.name.clone();
3616            let option_name_str = option_name.to_ast_string_simple();
3617            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3618                option_name: option_name.to_ast_string_simple(),
3619                err: e.into(),
3620            };
3621            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3622                val.map(|s| match s {
3623                    WithOptionValue::Value(Value::String(s)) => {
3624                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3625                    }
3626                    _ => Err("must be a string".to_string()),
3627                })
3628                .transpose()
3629                .map_err(PlanError::Unstructured)
3630                .map_err(better_error)
3631            };
3632            match option.name {
3633                CsrConfigOptionName::AvroKeyFullname => {
3634                    extracted.avro_key_fullname =
3635                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3636                }
3637                CsrConfigOptionName::AvroValueFullname => {
3638                    extracted.avro_value_fullname =
3639                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3640                }
3641                CsrConfigOptionName::NullDefaults => {
3642                    extracted.null_defaults =
3643                        <bool>::try_from_value(option.value).map_err(better_error)?;
3644                }
3645                CsrConfigOptionName::AvroDocOn(doc_on) => {
3646                    let value = String::try_from_value(option.value.ok_or_else(|| {
3647                        PlanError::InvalidOptionValue {
3648                            option_name: option_name_str,
3649                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3650                        }
3651                    })?)
3652                    .map_err(better_error)?;
3653                    let key = match doc_on.identifier {
3654                        DocOnIdentifier::Column(ast::ColumnName {
3655                            relation: ResolvedItemName::Item { id, .. },
3656                            column: ResolvedColumnReference::Column { name, index: _ },
3657                        }) => DocTarget::Field {
3658                            object_id: id,
3659                            column_name: name,
3660                        },
3661                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3662                            DocTarget::Type(id)
3663                        }
3664                        _ => unreachable!(),
3665                    };
3666
3667                    match doc_on.for_schema {
3668                        DocOnSchema::KeyOnly => {
3669                            extracted.key_doc_options.insert(key, value);
3670                        }
3671                        DocOnSchema::ValueOnly => {
3672                            extracted.value_doc_options.insert(key, value);
3673                        }
3674                        DocOnSchema::All => {
3675                            common_doc_comments.insert(key, value);
3676                        }
3677                    }
3678                }
3679                CsrConfigOptionName::KeyCompatibilityLevel => {
3680                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3681                }
3682                CsrConfigOptionName::ValueCompatibilityLevel => {
3683                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3684                }
3685            }
3686        }
3687
3688        for (key, value) in common_doc_comments {
3689            if !extracted.key_doc_options.contains_key(&key) {
3690                extracted.key_doc_options.insert(key.clone(), value.clone());
3691            }
3692            if !extracted.value_doc_options.contains_key(&key) {
3693                extracted.value_doc_options.insert(key, value);
3694            }
3695        }
3696        Ok(extracted)
3697    }
3698}
3699
3700fn iceberg_sink_builder(
3701    scx: &StatementContext,
3702    catalog_connection: ResolvedItemName,
3703    aws_connection: ResolvedItemName,
3704    options: Vec<IcebergSinkConfigOption<Aug>>,
3705    relation_key_indices: Option<Vec<usize>>,
3706    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3707) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3708    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3709    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3710    let catalog_connection_id = catalog_connection_item.id();
3711    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3712    let aws_connection_id = aws_connection_item.id();
3713    if !matches!(
3714        catalog_connection_item.connection()?,
3715        Connection::IcebergCatalog(_)
3716    ) {
3717        sql_bail!(
3718            "{} is not an iceberg catalog connection",
3719            scx.catalog
3720                .resolve_full_name(catalog_connection_item.name())
3721                .to_string()
3722                .quoted()
3723        );
3724    };
3725
3726    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3727        sql_bail!(
3728            "{} is not an AWS connection",
3729            scx.catalog
3730                .resolve_full_name(aws_connection_item.name())
3731                .to_string()
3732                .quoted()
3733        );
3734    }
3735
3736    let IcebergSinkConfigOptionExtracted {
3737        table,
3738        namespace,
3739        seen: _,
3740    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3741
3742    let Some(table) = table else {
3743        sql_bail!("Iceberg sink must specify TABLE");
3744    };
3745    let Some(namespace) = namespace else {
3746        sql_bail!("Iceberg sink must specify NAMESPACE");
3747    };
3748
3749    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3750        catalog_connection_id,
3751        catalog_connection: catalog_connection_id,
3752        aws_connection_id,
3753        aws_connection: aws_connection_id,
3754        table,
3755        namespace,
3756        relation_key_indices,
3757        key_desc_and_indices,
3758    }))
3759}
3760
3761fn kafka_sink_builder(
3762    scx: &StatementContext,
3763    connection: ResolvedItemName,
3764    options: Vec<KafkaSinkConfigOption<Aug>>,
3765    format: Option<FormatSpecifier<Aug>>,
3766    relation_key_indices: Option<Vec<usize>>,
3767    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3768    headers_index: Option<usize>,
3769    value_desc: RelationDesc,
3770    envelope: SinkEnvelope,
3771    sink_from: CatalogItemId,
3772) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3773    // Get Kafka connection.
3774    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3775    let connection_id = connection_item.id();
3776    match connection_item.connection()? {
3777        Connection::Kafka(_) => (),
3778        _ => sql_bail!(
3779            "{} is not a kafka connection",
3780            scx.catalog.resolve_full_name(connection_item.name())
3781        ),
3782    };
3783
3784    let KafkaSinkConfigOptionExtracted {
3785        topic,
3786        compression_type,
3787        partition_by,
3788        progress_group_id_prefix,
3789        transactional_id_prefix,
3790        legacy_ids,
3791        topic_config,
3792        topic_metadata_refresh_interval,
3793        topic_partition_count,
3794        topic_replication_factor,
3795        seen: _,
3796    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3797
3798    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3799        (Some(_), Some(true)) => {
3800            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3801        }
3802        (None, Some(true)) => KafkaIdStyle::Legacy,
3803        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3804    };
3805
3806    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3807        (Some(_), Some(true)) => {
3808            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3809        }
3810        (None, Some(true)) => KafkaIdStyle::Legacy,
3811        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3812    };
3813
3814    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3815
3816    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3817        // This is a librdkafka-enforced restriction that, if violated,
3818        // would result in a runtime error for the source.
3819        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3820    }
3821
3822    let assert_positive = |val: Option<i32>, name: &str| {
3823        if let Some(val) = val {
3824            if val <= 0 {
3825                sql_bail!("{} must be a positive integer", name);
3826            }
3827        }
3828        val.map(NonNeg::try_from)
3829            .transpose()
3830            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3831    };
3832    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3833    let topic_replication_factor =
3834        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3835
3836    // Helper method to parse avro connection options for format specifiers that use avro
3837    // for either key or value encoding.
3838    let gen_avro_schema_options = |conn| {
3839        let CsrConnectionAvro {
3840            connection:
3841                CsrConnection {
3842                    connection,
3843                    options,
3844                },
3845            seed,
3846            key_strategy,
3847            value_strategy,
3848        } = conn;
3849        if seed.is_some() {
3850            sql_bail!("SEED option does not make sense with sinks");
3851        }
3852        if key_strategy.is_some() {
3853            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3854        }
3855        if value_strategy.is_some() {
3856            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3857        }
3858
3859        let item = scx.get_item_by_resolved_name(&connection)?;
3860        let csr_connection = match item.connection()? {
3861            Connection::Csr(_) => item.id(),
3862            _ => {
3863                sql_bail!(
3864                    "{} is not a schema registry connection",
3865                    scx.catalog
3866                        .resolve_full_name(item.name())
3867                        .to_string()
3868                        .quoted()
3869                )
3870            }
3871        };
3872        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3873
3874        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3875            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3876        }
3877
3878        if key_desc_and_indices.is_some()
3879            && (extracted_options.avro_key_fullname.is_some()
3880                ^ extracted_options.avro_value_fullname.is_some())
3881        {
3882            sql_bail!(
3883                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3884            );
3885        }
3886
3887        Ok((csr_connection, extracted_options))
3888    };
3889
3890    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3891        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3892        Format::Bytes if desc.arity() == 1 => {
3893            let col_type = &desc.typ().column_types[0].scalar_type;
3894            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3895                bail_unsupported!(format!(
3896                    "BYTES format with non-encodable type: {:?}",
3897                    col_type
3898                ));
3899            }
3900
3901            Ok(KafkaSinkFormatType::Bytes)
3902        }
3903        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3904        Format::Bytes | Format::Text => {
3905            bail_unsupported!("BYTES or TEXT format with multiple columns")
3906        }
3907        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3908        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3909            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3910            let schema = if is_key {
3911                AvroSchemaGenerator::new(
3912                    desc.clone(),
3913                    false,
3914                    options.key_doc_options,
3915                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3916                    options.null_defaults,
3917                    Some(sink_from),
3918                    false,
3919                )?
3920                .schema()
3921                .to_string()
3922            } else {
3923                AvroSchemaGenerator::new(
3924                    desc.clone(),
3925                    matches!(envelope, SinkEnvelope::Debezium),
3926                    options.value_doc_options,
3927                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3928                    options.null_defaults,
3929                    Some(sink_from),
3930                    true,
3931                )?
3932                .schema()
3933                .to_string()
3934            };
3935            Ok(KafkaSinkFormatType::Avro {
3936                schema,
3937                compatibility_level: if is_key {
3938                    options.key_compatibility_level
3939                } else {
3940                    options.value_compatibility_level
3941                },
3942                csr_connection,
3943            })
3944        }
3945        format => bail_unsupported!(format!("sink format {:?}", format)),
3946    };
3947
3948    let partition_by = match &partition_by {
3949        Some(partition_by) => {
3950            let mut scope = Scope::from_source(None, value_desc.iter_names());
3951
3952            match envelope {
3953                SinkEnvelope::Upsert => (),
3954                SinkEnvelope::Debezium => {
3955                    let key_indices: HashSet<_> = key_desc_and_indices
3956                        .as_ref()
3957                        .map(|(_desc, indices)| indices.as_slice())
3958                        .unwrap_or_default()
3959                        .into_iter()
3960                        .collect();
3961                    for (i, item) in scope.items.iter_mut().enumerate() {
3962                        if !key_indices.contains(&i) {
3963                            item.error_if_referenced = Some(|_table, column| {
3964                                PlanError::InvalidPartitionByEnvelopeDebezium {
3965                                    column_name: column.to_string(),
3966                                }
3967                            });
3968                        }
3969                    }
3970                }
3971            };
3972
3973            let ecx = &ExprContext {
3974                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3975                name: "PARTITION BY",
3976                scope: &scope,
3977                relation_type: value_desc.typ(),
3978                allow_aggregates: false,
3979                allow_subqueries: false,
3980                allow_parameters: false,
3981                allow_windows: false,
3982            };
3983            let expr = plan_expr(ecx, partition_by)?.cast_to(
3984                ecx,
3985                CastContext::Assignment,
3986                &SqlScalarType::UInt64,
3987            )?;
3988            let expr = expr.lower_uncorrelated()?;
3989
3990            Some(expr)
3991        }
3992        _ => None,
3993    };
3994
3995    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3996    let format = match format {
3997        Some(FormatSpecifier::KeyValue { key, value }) => {
3998            let key_format = match key_desc_and_indices.as_ref() {
3999                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4000                None => None,
4001            };
4002            KafkaSinkFormat {
4003                value_format: map_format(value, &value_desc, false)?,
4004                key_format,
4005            }
4006        }
4007        Some(FormatSpecifier::Bare(format)) => {
4008            let key_format = match key_desc_and_indices.as_ref() {
4009                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4010                None => None,
4011            };
4012            KafkaSinkFormat {
4013                value_format: map_format(format, &value_desc, false)?,
4014                key_format,
4015            }
4016        }
4017        None => bail_unsupported!("sink without format"),
4018    };
4019
4020    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4021        connection_id,
4022        connection: connection_id,
4023        format,
4024        topic: topic_name,
4025        relation_key_indices,
4026        key_desc_and_indices,
4027        headers_index,
4028        value_desc,
4029        partition_by,
4030        compression_type,
4031        progress_group_id,
4032        transactional_id,
4033        topic_options: KafkaTopicOptions {
4034            partition_count: topic_partition_count,
4035            replication_factor: topic_replication_factor,
4036            topic_config: topic_config.unwrap_or_default(),
4037        },
4038        topic_metadata_refresh_interval,
4039    }))
4040}
4041
4042pub fn describe_create_index(
4043    _: &StatementContext,
4044    _: CreateIndexStatement<Aug>,
4045) -> Result<StatementDesc, PlanError> {
4046    Ok(StatementDesc::new(None))
4047}
4048
4049pub fn plan_create_index(
4050    scx: &StatementContext,
4051    mut stmt: CreateIndexStatement<Aug>,
4052) -> Result<Plan, PlanError> {
4053    let CreateIndexStatement {
4054        name,
4055        on_name,
4056        in_cluster,
4057        key_parts,
4058        with_options,
4059        if_not_exists,
4060    } = &mut stmt;
4061    let on = scx.get_item_by_resolved_name(on_name)?;
4062    let on_item_type = on.item_type();
4063
4064    if !matches!(
4065        on_item_type,
4066        CatalogItemType::View
4067            | CatalogItemType::MaterializedView
4068            | CatalogItemType::Source
4069            | CatalogItemType::Table
4070    ) {
4071        sql_bail!(
4072            "index cannot be created on {} because it is a {}",
4073            on_name.full_name_str(),
4074            on.item_type()
4075        )
4076    }
4077
4078    let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
4079
4080    let filled_key_parts = match key_parts {
4081        Some(kp) => kp.to_vec(),
4082        None => {
4083            // `key_parts` is None if we're creating a "default" index.
4084            on.desc(&scx.catalog.resolve_full_name(on.name()))?
4085                .typ()
4086                .default_key()
4087                .iter()
4088                .map(|i| match on_desc.get_unambiguous_name(*i) {
4089                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4090                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4091                })
4092                .collect()
4093        }
4094    };
4095    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4096
4097    let index_name = if let Some(name) = name {
4098        QualifiedItemName {
4099            qualifiers: on.name().qualifiers.clone(),
4100            item: normalize::ident(name.clone()),
4101        }
4102    } else {
4103        let mut idx_name = QualifiedItemName {
4104            qualifiers: on.name().qualifiers.clone(),
4105            item: on.name().item.clone(),
4106        };
4107        if key_parts.is_none() {
4108            // We're trying to create the "default" index.
4109            idx_name.item += "_primary_idx";
4110        } else {
4111            // Use PG schema for automatically naming indexes:
4112            // `<table>_<_-separated indexed expressions>_idx`
4113            let index_name_col_suffix = keys
4114                .iter()
4115                .map(|k| match k {
4116                    mz_expr::MirScalarExpr::Column(i, name) => {
4117                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4118                            (Some(col_name), _) => col_name.to_string(),
4119                            (None, Some(name)) => name.to_string(),
4120                            (None, None) => format!("{}", i + 1),
4121                        }
4122                    }
4123                    _ => "expr".to_string(),
4124                })
4125                .join("_");
4126            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4127                .expect("write on strings cannot fail");
4128            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4129        }
4130
4131        if !*if_not_exists {
4132            scx.catalog.find_available_name(idx_name)
4133        } else {
4134            idx_name
4135        }
4136    };
4137
4138    // Check for an object in the catalog with this same name
4139    let full_name = scx.catalog.resolve_full_name(&index_name);
4140    let partial_name = PartialItemName::from(full_name.clone());
4141    // For PostgreSQL compatibility, we need to prevent creating indexes when
4142    // there is an existing object *or* type of the same name.
4143    //
4144    // Technically, we only need to prevent coexistence of indexes and types
4145    // that have an associated relation (record types but not list/map types).
4146    // Enforcing that would be more complicated, though. It's backwards
4147    // compatible to weaken this restriction in the future.
4148    if let (Ok(item), false, false) = (
4149        scx.catalog.resolve_item_or_type(&partial_name),
4150        *if_not_exists,
4151        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4152    ) {
4153        return Err(PlanError::ItemAlreadyExists {
4154            name: full_name.to_string(),
4155            item_type: item.item_type(),
4156        });
4157    }
4158
4159    let options = plan_index_options(scx, with_options.clone())?;
4160    let cluster_id = match in_cluster {
4161        None => scx.resolve_cluster(None)?.id(),
4162        Some(in_cluster) => in_cluster.id,
4163    };
4164
4165    *in_cluster = Some(ResolvedClusterName {
4166        id: cluster_id,
4167        print_name: None,
4168    });
4169
4170    // Normalize `stmt`.
4171    *name = Some(Ident::new(index_name.item.clone())?);
4172    *key_parts = Some(filled_key_parts);
4173    let if_not_exists = *if_not_exists;
4174
4175    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4176    let compaction_window = options.iter().find_map(|o| {
4177        #[allow(irrefutable_let_patterns)]
4178        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4179            Some(lcw.clone())
4180        } else {
4181            None
4182        }
4183    });
4184
4185    Ok(Plan::CreateIndex(CreateIndexPlan {
4186        name: index_name,
4187        index: Index {
4188            create_sql,
4189            on: on.global_id(),
4190            keys,
4191            cluster_id,
4192            compaction_window,
4193        },
4194        if_not_exists,
4195    }))
4196}
4197
4198pub fn describe_create_type(
4199    _: &StatementContext,
4200    _: CreateTypeStatement<Aug>,
4201) -> Result<StatementDesc, PlanError> {
4202    Ok(StatementDesc::new(None))
4203}
4204
4205pub fn plan_create_type(
4206    scx: &StatementContext,
4207    stmt: CreateTypeStatement<Aug>,
4208) -> Result<Plan, PlanError> {
4209    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4210    let CreateTypeStatement { name, as_type, .. } = stmt;
4211
4212    fn validate_data_type(
4213        scx: &StatementContext,
4214        data_type: ResolvedDataType,
4215        as_type: &str,
4216        key: &str,
4217    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4218        let (id, modifiers) = match data_type {
4219            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4220            _ => sql_bail!(
4221                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4222                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4223                as_type,
4224                key,
4225                data_type.human_readable_name(),
4226            ),
4227        };
4228
4229        let item = scx.catalog.get_item(&id);
4230        match item.type_details() {
4231            None => sql_bail!(
4232                "{} must be of class type, but received {} which is of class {}",
4233                key,
4234                scx.catalog.resolve_full_name(item.name()),
4235                item.item_type()
4236            ),
4237            Some(CatalogTypeDetails {
4238                typ: CatalogType::Char,
4239                ..
4240            }) => {
4241                bail_unsupported!("embedding char type in a list or map")
4242            }
4243            _ => {
4244                // Validate that the modifiers are actually valid.
4245                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4246
4247                Ok((id, modifiers))
4248            }
4249        }
4250    }
4251
4252    let inner = match as_type {
4253        CreateTypeAs::List { options } => {
4254            let CreateTypeListOptionExtracted {
4255                element_type,
4256                seen: _,
4257            } = CreateTypeListOptionExtracted::try_from(options)?;
4258            let element_type =
4259                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4260            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4261            CatalogType::List {
4262                element_reference: id,
4263                element_modifiers: modifiers,
4264            }
4265        }
4266        CreateTypeAs::Map { options } => {
4267            let CreateTypeMapOptionExtracted {
4268                key_type,
4269                value_type,
4270                seen: _,
4271            } = CreateTypeMapOptionExtracted::try_from(options)?;
4272            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4273            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4274            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4275            let (value_id, value_modifiers) =
4276                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4277            CatalogType::Map {
4278                key_reference: key_id,
4279                key_modifiers,
4280                value_reference: value_id,
4281                value_modifiers,
4282            }
4283        }
4284        CreateTypeAs::Record { column_defs } => {
4285            let mut fields = vec![];
4286            for column_def in column_defs {
4287                let data_type = column_def.data_type;
4288                let key = ident(column_def.name.clone());
4289                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4290                fields.push(CatalogRecordField {
4291                    name: ColumnName::from(key.clone()),
4292                    type_reference: id,
4293                    type_modifiers: modifiers,
4294                });
4295            }
4296            CatalogType::Record { fields }
4297        }
4298    };
4299
4300    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4301
4302    // Check for an object in the catalog with this same name
4303    let full_name = scx.catalog.resolve_full_name(&name);
4304    let partial_name = PartialItemName::from(full_name.clone());
4305    // For PostgreSQL compatibility, we need to prevent creating types when
4306    // there is an existing object *or* type of the same name.
4307    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4308        if item.item_type().conflicts_with_type() {
4309            return Err(PlanError::ItemAlreadyExists {
4310                name: full_name.to_string(),
4311                item_type: item.item_type(),
4312            });
4313        }
4314    }
4315
4316    Ok(Plan::CreateType(CreateTypePlan {
4317        name,
4318        typ: Type { create_sql, inner },
4319    }))
4320}
4321
4322generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4323
4324generate_extracted_config!(
4325    CreateTypeMapOption,
4326    (KeyType, ResolvedDataType),
4327    (ValueType, ResolvedDataType)
4328);
4329
4330#[derive(Debug)]
4331pub enum PlannedAlterRoleOption {
4332    Attributes(PlannedRoleAttributes),
4333    Variable(PlannedRoleVariable),
4334}
4335
4336#[derive(Debug, Clone)]
4337pub struct PlannedRoleAttributes {
4338    pub inherit: Option<bool>,
4339    pub password: Option<Password>,
4340    /// `nopassword` is set to true if the password is from the parser is None.
4341    /// This is semantically different than not supplying a password at all,
4342    /// to allow for unsetting a password.
4343    pub nopassword: Option<bool>,
4344    pub superuser: Option<bool>,
4345    pub login: Option<bool>,
4346}
4347
4348fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4349    let mut planned_attributes = PlannedRoleAttributes {
4350        inherit: None,
4351        password: None,
4352        superuser: None,
4353        login: None,
4354        nopassword: None,
4355    };
4356
4357    for option in options {
4358        match option {
4359            RoleAttribute::Inherit | RoleAttribute::NoInherit
4360                if planned_attributes.inherit.is_some() =>
4361            {
4362                sql_bail!("conflicting or redundant options");
4363            }
4364            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4365                bail_never_supported!(
4366                    "CREATECLUSTER attribute",
4367                    "sql/create-role/#details",
4368                    "Use system privileges instead."
4369                );
4370            }
4371            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4372                bail_never_supported!(
4373                    "CREATEDB attribute",
4374                    "sql/create-role/#details",
4375                    "Use system privileges instead."
4376                );
4377            }
4378            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4379                bail_never_supported!(
4380                    "CREATEROLE attribute",
4381                    "sql/create-role/#details",
4382                    "Use system privileges instead."
4383                );
4384            }
4385            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4386                sql_bail!("conflicting or redundant options");
4387            }
4388
4389            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4390            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4391            RoleAttribute::Password(password) => {
4392                if let Some(password) = password {
4393                    planned_attributes.password = Some(password.into());
4394                } else {
4395                    planned_attributes.nopassword = Some(true);
4396                }
4397            }
4398            RoleAttribute::SuperUser => {
4399                if planned_attributes.superuser == Some(false) {
4400                    sql_bail!("conflicting or redundant options");
4401                }
4402                planned_attributes.superuser = Some(true);
4403            }
4404            RoleAttribute::NoSuperUser => {
4405                if planned_attributes.superuser == Some(true) {
4406                    sql_bail!("conflicting or redundant options");
4407                }
4408                planned_attributes.superuser = Some(false);
4409            }
4410            RoleAttribute::Login => {
4411                if planned_attributes.login == Some(false) {
4412                    sql_bail!("conflicting or redundant options");
4413                }
4414                planned_attributes.login = Some(true);
4415            }
4416            RoleAttribute::NoLogin => {
4417                if planned_attributes.login == Some(true) {
4418                    sql_bail!("conflicting or redundant options");
4419                }
4420                planned_attributes.login = Some(false);
4421            }
4422        }
4423    }
4424    if planned_attributes.inherit == Some(false) {
4425        bail_unsupported!("non inherit roles");
4426    }
4427
4428    Ok(planned_attributes)
4429}
4430
4431#[derive(Debug)]
4432pub enum PlannedRoleVariable {
4433    Set { name: String, value: VariableValue },
4434    Reset { name: String },
4435}
4436
4437impl PlannedRoleVariable {
4438    pub fn name(&self) -> &str {
4439        match self {
4440            PlannedRoleVariable::Set { name, .. } => name,
4441            PlannedRoleVariable::Reset { name } => name,
4442        }
4443    }
4444}
4445
4446fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4447    let plan = match variable {
4448        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4449            name: name.to_string(),
4450            value: scl::plan_set_variable_to(value)?,
4451        },
4452        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4453            name: name.to_string(),
4454        },
4455    };
4456    Ok(plan)
4457}
4458
4459pub fn describe_create_role(
4460    _: &StatementContext,
4461    _: CreateRoleStatement,
4462) -> Result<StatementDesc, PlanError> {
4463    Ok(StatementDesc::new(None))
4464}
4465
4466pub fn plan_create_role(
4467    _: &StatementContext,
4468    CreateRoleStatement { name, options }: CreateRoleStatement,
4469) -> Result<Plan, PlanError> {
4470    let attributes = plan_role_attributes(options)?;
4471    Ok(Plan::CreateRole(CreateRolePlan {
4472        name: normalize::ident(name),
4473        attributes: attributes.into(),
4474    }))
4475}
4476
4477pub fn plan_create_network_policy(
4478    ctx: &StatementContext,
4479    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4480) -> Result<Plan, PlanError> {
4481    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4482    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4483
4484    let Some(rule_defs) = policy_options.rules else {
4485        sql_bail!("RULES must be specified when creating network policies.");
4486    };
4487
4488    let mut rules = vec![];
4489    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4490        let NetworkPolicyRuleOptionExtracted {
4491            seen: _,
4492            direction,
4493            action,
4494            address,
4495        } = options.try_into()?;
4496        let (direction, action, address) = match (direction, action, address) {
4497            (Some(direction), Some(action), Some(address)) => (
4498                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4499                NetworkPolicyRuleAction::try_from(action.as_str())?,
4500                PolicyAddress::try_from(address.as_str())?,
4501            ),
4502            (_, _, _) => {
4503                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4504            }
4505        };
4506        rules.push(NetworkPolicyRule {
4507            name: normalize::ident(name),
4508            direction,
4509            action,
4510            address,
4511        });
4512    }
4513
4514    if rules.len()
4515        > ctx
4516            .catalog
4517            .system_vars()
4518            .max_rules_per_network_policy()
4519            .try_into()?
4520    {
4521        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4522    }
4523
4524    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4525        name: normalize::ident(name),
4526        rules,
4527    }))
4528}
4529
4530pub fn plan_alter_network_policy(
4531    ctx: &StatementContext,
4532    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4533) -> Result<Plan, PlanError> {
4534    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4535
4536    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4537    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4538
4539    let Some(rule_defs) = policy_options.rules else {
4540        sql_bail!("RULES must be specified when creating network policies.");
4541    };
4542
4543    let mut rules = vec![];
4544    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4545        let NetworkPolicyRuleOptionExtracted {
4546            seen: _,
4547            direction,
4548            action,
4549            address,
4550        } = options.try_into()?;
4551
4552        let (direction, action, address) = match (direction, action, address) {
4553            (Some(direction), Some(action), Some(address)) => (
4554                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4555                NetworkPolicyRuleAction::try_from(action.as_str())?,
4556                PolicyAddress::try_from(address.as_str())?,
4557            ),
4558            (_, _, _) => {
4559                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4560            }
4561        };
4562        rules.push(NetworkPolicyRule {
4563            name: normalize::ident(name),
4564            direction,
4565            action,
4566            address,
4567        });
4568    }
4569    if rules.len()
4570        > ctx
4571            .catalog
4572            .system_vars()
4573            .max_rules_per_network_policy()
4574            .try_into()?
4575    {
4576        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4577    }
4578
4579    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4580        id: policy.id(),
4581        name: normalize::ident(name),
4582        rules,
4583    }))
4584}
4585
4586pub fn describe_create_cluster(
4587    _: &StatementContext,
4588    _: CreateClusterStatement<Aug>,
4589) -> Result<StatementDesc, PlanError> {
4590    Ok(StatementDesc::new(None))
4591}
4592
4593// WARNING:
4594// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4595// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4596// that option stays the same. If you were to give a default value here, then not giving that option
4597// to ALTER CLUSTER would always reset the value of that option to the default.
4598generate_extracted_config!(
4599    ClusterOption,
4600    (AvailabilityZones, Vec<String>),
4601    (Disk, bool),
4602    (IntrospectionDebugging, bool),
4603    (IntrospectionInterval, OptionalDuration),
4604    (Managed, bool),
4605    (Replicas, Vec<ReplicaDefinition<Aug>>),
4606    (ReplicationFactor, u32),
4607    (Size, String),
4608    (Schedule, ClusterScheduleOptionValue),
4609    (WorkloadClass, OptionalString)
4610);
4611
4612generate_extracted_config!(
4613    NetworkPolicyOption,
4614    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4615);
4616
4617generate_extracted_config!(
4618    NetworkPolicyRuleOption,
4619    (Direction, String),
4620    (Action, String),
4621    (Address, String)
4622);
4623
4624generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4625
4626generate_extracted_config!(
4627    ClusterAlterUntilReadyOption,
4628    (Timeout, Duration),
4629    (OnTimeout, String)
4630);
4631
4632generate_extracted_config!(
4633    ClusterFeature,
4634    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4635    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4636    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4637    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4638    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4639    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4640    (
4641        EnableProjectionPushdownAfterRelationCse,
4642        Option<bool>,
4643        Default(None)
4644    )
4645);
4646
4647/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4648///
4649/// The reverse of [`unplan_create_cluster`].
4650pub fn plan_create_cluster(
4651    scx: &StatementContext,
4652    stmt: CreateClusterStatement<Aug>,
4653) -> Result<Plan, PlanError> {
4654    let plan = plan_create_cluster_inner(scx, stmt)?;
4655
4656    // Roundtrip through unplan and make sure that we end up with the same plan.
4657    if let CreateClusterVariant::Managed(_) = &plan.variant {
4658        let stmt = unplan_create_cluster(scx, plan.clone())
4659            .map_err(|e| PlanError::Replan(e.to_string()))?;
4660        let create_sql = stmt.to_ast_string_stable();
4661        let stmt = parse::parse(&create_sql)
4662            .map_err(|e| PlanError::Replan(e.to_string()))?
4663            .into_element()
4664            .ast;
4665        let (stmt, _resolved_ids) =
4666            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4667        let stmt = match stmt {
4668            Statement::CreateCluster(stmt) => stmt,
4669            stmt => {
4670                return Err(PlanError::Replan(format!(
4671                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4672                )));
4673            }
4674        };
4675        let replan =
4676            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4677        if plan != replan {
4678            return Err(PlanError::Replan(format!(
4679                "replan does not match: plan={plan:?}, replan={replan:?}"
4680            )));
4681        }
4682    }
4683
4684    Ok(Plan::CreateCluster(plan))
4685}
4686
4687pub fn plan_create_cluster_inner(
4688    scx: &StatementContext,
4689    CreateClusterStatement {
4690        name,
4691        options,
4692        features,
4693    }: CreateClusterStatement<Aug>,
4694) -> Result<CreateClusterPlan, PlanError> {
4695    let ClusterOptionExtracted {
4696        availability_zones,
4697        introspection_debugging,
4698        introspection_interval,
4699        managed,
4700        replicas,
4701        replication_factor,
4702        seen: _,
4703        size,
4704        disk,
4705        schedule,
4706        workload_class,
4707    }: ClusterOptionExtracted = options.try_into()?;
4708
4709    let managed = managed.unwrap_or_else(|| replicas.is_none());
4710
4711    if !scx.catalog.active_role_id().is_system() {
4712        if !features.is_empty() {
4713            sql_bail!("FEATURES not supported for non-system users");
4714        }
4715        if workload_class.is_some() {
4716            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4717        }
4718    }
4719
4720    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4721    let workload_class = workload_class.and_then(|v| v.0);
4722
4723    if managed {
4724        if replicas.is_some() {
4725            sql_bail!("REPLICAS not supported for managed clusters");
4726        }
4727        let Some(size) = size else {
4728            sql_bail!("SIZE must be specified for managed clusters");
4729        };
4730
4731        if disk.is_some() {
4732            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4733            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4734            // we'll be able to remove the `DISK` option entirely.
4735            if scx.catalog.is_cluster_size_cc(&size) {
4736                sql_bail!(
4737                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4738                );
4739            }
4740
4741            scx.catalog
4742                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4743        }
4744
4745        let compute = plan_compute_replica_config(
4746            introspection_interval,
4747            introspection_debugging.unwrap_or(false),
4748        )?;
4749
4750        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4751            replication_factor.unwrap_or_else(|| {
4752                scx.catalog
4753                    .system_vars()
4754                    .default_cluster_replication_factor()
4755            })
4756        } else {
4757            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4758            if replication_factor.is_some() {
4759                sql_bail!(
4760                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4761                );
4762            }
4763            // If we have a non-trivial schedule, then let's not have any replicas initially,
4764            // to avoid quickly going back and forth if the schedule doesn't want a replica
4765            // initially.
4766            0
4767        };
4768        let availability_zones = availability_zones.unwrap_or_default();
4769
4770        if !availability_zones.is_empty() {
4771            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4772        }
4773
4774        // Plan OptimizerFeatureOverrides.
4775        let ClusterFeatureExtracted {
4776            reoptimize_imported_views,
4777            enable_eager_delta_joins,
4778            enable_new_outer_join_lowering,
4779            enable_variadic_left_join_lowering,
4780            enable_letrec_fixpoint_analysis,
4781            enable_join_prioritize_arranged,
4782            enable_projection_pushdown_after_relation_cse,
4783            seen: _,
4784        } = ClusterFeatureExtracted::try_from(features)?;
4785        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4786            reoptimize_imported_views,
4787            enable_eager_delta_joins,
4788            enable_new_outer_join_lowering,
4789            enable_variadic_left_join_lowering,
4790            enable_letrec_fixpoint_analysis,
4791            enable_join_prioritize_arranged,
4792            enable_projection_pushdown_after_relation_cse,
4793            ..Default::default()
4794        };
4795
4796        let schedule = plan_cluster_schedule(schedule)?;
4797
4798        Ok(CreateClusterPlan {
4799            name: normalize::ident(name),
4800            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4801                replication_factor,
4802                size,
4803                availability_zones,
4804                compute,
4805                optimizer_feature_overrides,
4806                schedule,
4807            }),
4808            workload_class,
4809        })
4810    } else {
4811        let Some(replica_defs) = replicas else {
4812            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4813        };
4814        if availability_zones.is_some() {
4815            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4816        }
4817        if replication_factor.is_some() {
4818            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4819        }
4820        if introspection_debugging.is_some() {
4821            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4822        }
4823        if introspection_interval.is_some() {
4824            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4825        }
4826        if size.is_some() {
4827            sql_bail!("SIZE not supported for unmanaged clusters");
4828        }
4829        if disk.is_some() {
4830            sql_bail!("DISK not supported for unmanaged clusters");
4831        }
4832        if !features.is_empty() {
4833            sql_bail!("FEATURES not supported for unmanaged clusters");
4834        }
4835        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4836            sql_bail!(
4837                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4838            );
4839        }
4840
4841        let mut replicas = vec![];
4842        for ReplicaDefinition { name, options } in replica_defs {
4843            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4844        }
4845
4846        Ok(CreateClusterPlan {
4847            name: normalize::ident(name),
4848            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4849            workload_class,
4850        })
4851    }
4852}
4853
4854/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4855///
4856/// The reverse of [`plan_create_cluster`].
4857pub fn unplan_create_cluster(
4858    scx: &StatementContext,
4859    CreateClusterPlan {
4860        name,
4861        variant,
4862        workload_class,
4863    }: CreateClusterPlan,
4864) -> Result<CreateClusterStatement<Aug>, PlanError> {
4865    match variant {
4866        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4867            replication_factor,
4868            size,
4869            availability_zones,
4870            compute,
4871            optimizer_feature_overrides,
4872            schedule,
4873        }) => {
4874            let schedule = unplan_cluster_schedule(schedule);
4875            let OptimizerFeatureOverrides {
4876                enable_guard_subquery_tablefunc: _,
4877                enable_consolidate_after_union_negate: _,
4878                enable_reduce_mfp_fusion: _,
4879                enable_cardinality_estimates: _,
4880                persist_fast_path_limit: _,
4881                reoptimize_imported_views,
4882                enable_eager_delta_joins,
4883                enable_new_outer_join_lowering,
4884                enable_variadic_left_join_lowering,
4885                enable_letrec_fixpoint_analysis,
4886                enable_reduce_reduction: _,
4887                enable_join_prioritize_arranged,
4888                enable_projection_pushdown_after_relation_cse,
4889                enable_less_reduce_in_eqprop: _,
4890                enable_dequadratic_eqprop_map: _,
4891                enable_eq_classes_withholding_errors: _,
4892                enable_fast_path_plan_insights: _,
4893            } = optimizer_feature_overrides;
4894            // The ones from above that don't occur below are not wired up to cluster features.
4895            let features_extracted = ClusterFeatureExtracted {
4896                // Seen is ignored when unplanning.
4897                seen: Default::default(),
4898                reoptimize_imported_views,
4899                enable_eager_delta_joins,
4900                enable_new_outer_join_lowering,
4901                enable_variadic_left_join_lowering,
4902                enable_letrec_fixpoint_analysis,
4903                enable_join_prioritize_arranged,
4904                enable_projection_pushdown_after_relation_cse,
4905            };
4906            let features = features_extracted.into_values(scx.catalog);
4907            let availability_zones = if availability_zones.is_empty() {
4908                None
4909            } else {
4910                Some(availability_zones)
4911            };
4912            let (introspection_interval, introspection_debugging) =
4913                unplan_compute_replica_config(compute);
4914            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4915            // always 1 or less.
4916            let replication_factor = match &schedule {
4917                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4918                ClusterScheduleOptionValue::Refresh { .. } => {
4919                    assert!(
4920                        replication_factor <= 1,
4921                        "replication factor, {replication_factor:?}, must be <= 1"
4922                    );
4923                    None
4924                }
4925            };
4926            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4927            let options_extracted = ClusterOptionExtracted {
4928                // Seen is ignored when unplanning.
4929                seen: Default::default(),
4930                availability_zones,
4931                disk: None,
4932                introspection_debugging: Some(introspection_debugging),
4933                introspection_interval,
4934                managed: Some(true),
4935                replicas: None,
4936                replication_factor,
4937                size: Some(size),
4938                schedule: Some(schedule),
4939                workload_class,
4940            };
4941            let options = options_extracted.into_values(scx.catalog);
4942            let name = Ident::new_unchecked(name);
4943            Ok(CreateClusterStatement {
4944                name,
4945                options,
4946                features,
4947            })
4948        }
4949        CreateClusterVariant::Unmanaged(_) => {
4950            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4951        }
4952    }
4953}
4954
4955generate_extracted_config!(
4956    ReplicaOption,
4957    (AvailabilityZone, String),
4958    (BilledAs, String),
4959    (ComputeAddresses, Vec<String>),
4960    (ComputectlAddresses, Vec<String>),
4961    (Disk, bool),
4962    (Internal, bool, Default(false)),
4963    (IntrospectionDebugging, bool, Default(false)),
4964    (IntrospectionInterval, OptionalDuration),
4965    (Size, String),
4966    (StorageAddresses, Vec<String>),
4967    (StoragectlAddresses, Vec<String>),
4968    (Workers, u16)
4969);
4970
4971fn plan_replica_config(
4972    scx: &StatementContext,
4973    options: Vec<ReplicaOption<Aug>>,
4974) -> Result<ReplicaConfig, PlanError> {
4975    let ReplicaOptionExtracted {
4976        availability_zone,
4977        billed_as,
4978        computectl_addresses,
4979        disk,
4980        internal,
4981        introspection_debugging,
4982        introspection_interval,
4983        size,
4984        storagectl_addresses,
4985        ..
4986    }: ReplicaOptionExtracted = options.try_into()?;
4987
4988    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4989
4990    match (
4991        size,
4992        availability_zone,
4993        billed_as,
4994        storagectl_addresses,
4995        computectl_addresses,
4996    ) {
4997        // Common cases we expect end users to hit.
4998        (None, _, None, None, None) => {
4999            // We don't mention the unmanaged options in the error message
5000            // because they are only available in unsafe mode.
5001            sql_bail!("SIZE option must be specified");
5002        }
5003        (Some(size), availability_zone, billed_as, None, None) => {
5004            if disk.is_some() {
5005                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5006                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5007                // we'll be able to remove the `DISK` option entirely.
5008                if scx.catalog.is_cluster_size_cc(&size) {
5009                    sql_bail!(
5010                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5011                    );
5012                }
5013
5014                scx.catalog
5015                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5016            }
5017
5018            Ok(ReplicaConfig::Orchestrated {
5019                size,
5020                availability_zone,
5021                compute,
5022                billed_as,
5023                internal,
5024            })
5025        }
5026
5027        (None, None, None, storagectl_addresses, computectl_addresses) => {
5028            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5029
5030            // When manually testing Materialize in unsafe mode, it's easy to
5031            // accidentally omit one of these options, so we try to produce
5032            // helpful error messages.
5033            let Some(storagectl_addrs) = storagectl_addresses else {
5034                sql_bail!("missing STORAGECTL ADDRESSES option");
5035            };
5036            let Some(computectl_addrs) = computectl_addresses else {
5037                sql_bail!("missing COMPUTECTL ADDRESSES option");
5038            };
5039
5040            if storagectl_addrs.len() != computectl_addrs.len() {
5041                sql_bail!(
5042                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5043                );
5044            }
5045
5046            if disk.is_some() {
5047                sql_bail!("DISK can't be specified for unorchestrated clusters");
5048            }
5049
5050            Ok(ReplicaConfig::Unorchestrated {
5051                storagectl_addrs,
5052                computectl_addrs,
5053                compute,
5054            })
5055        }
5056        _ => {
5057            // We don't bother trying to produce a more helpful error message
5058            // here because no user is likely to hit this path.
5059            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5060        }
5061    }
5062}
5063
5064/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5065///
5066/// The reverse of [`unplan_compute_replica_config`].
5067fn plan_compute_replica_config(
5068    introspection_interval: Option<OptionalDuration>,
5069    introspection_debugging: bool,
5070) -> Result<ComputeReplicaConfig, PlanError> {
5071    let introspection_interval = introspection_interval
5072        .map(|OptionalDuration(i)| i)
5073        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5074    let introspection = match introspection_interval {
5075        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5076            interval,
5077            debugging: introspection_debugging,
5078        }),
5079        None if introspection_debugging => {
5080            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5081        }
5082        None => None,
5083    };
5084    let compute = ComputeReplicaConfig { introspection };
5085    Ok(compute)
5086}
5087
5088/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5089///
5090/// The reverse of [`plan_compute_replica_config`].
5091fn unplan_compute_replica_config(
5092    compute_replica_config: ComputeReplicaConfig,
5093) -> (Option<OptionalDuration>, bool) {
5094    match compute_replica_config.introspection {
5095        Some(ComputeReplicaIntrospectionConfig {
5096            debugging,
5097            interval,
5098        }) => (Some(OptionalDuration(Some(interval))), debugging),
5099        None => (Some(OptionalDuration(None)), false),
5100    }
5101}
5102
5103/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5104///
5105/// The reverse of [`unplan_cluster_schedule`].
5106fn plan_cluster_schedule(
5107    schedule: ClusterScheduleOptionValue,
5108) -> Result<ClusterSchedule, PlanError> {
5109    Ok(match schedule {
5110        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5111        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5112        ClusterScheduleOptionValue::Refresh {
5113            hydration_time_estimate: None,
5114        } => ClusterSchedule::Refresh {
5115            hydration_time_estimate: Duration::from_millis(0),
5116        },
5117        // Otherwise we convert the `IntervalValue` to a `Duration`.
5118        ClusterScheduleOptionValue::Refresh {
5119            hydration_time_estimate: Some(interval_value),
5120        } => {
5121            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5122            if interval.as_microseconds() < 0 {
5123                sql_bail!(
5124                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5125                    interval
5126                );
5127            }
5128            if interval.months != 0 {
5129                // This limitation is because we want this interval to be cleanly convertable
5130                // to a unix epoch timestamp difference. When the interval involves months, then
5131                // this is not true anymore, because months have variable lengths.
5132                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5133            }
5134            let duration = interval.duration()?;
5135            if u64::try_from(duration.as_millis()).is_err()
5136                || Interval::from_duration(&duration).is_err()
5137            {
5138                sql_bail!("HYDRATION TIME ESTIMATE too large");
5139            }
5140            ClusterSchedule::Refresh {
5141                hydration_time_estimate: duration,
5142            }
5143        }
5144    })
5145}
5146
5147/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5148///
5149/// The reverse of [`plan_cluster_schedule`].
5150fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5151    match schedule {
5152        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5153        ClusterSchedule::Refresh {
5154            hydration_time_estimate,
5155        } => {
5156            let interval = Interval::from_duration(&hydration_time_estimate)
5157                .expect("planning ensured that this is convertible back to Interval");
5158            let interval_value = literal::unplan_interval(&interval);
5159            ClusterScheduleOptionValue::Refresh {
5160                hydration_time_estimate: Some(interval_value),
5161            }
5162        }
5163    }
5164}
5165
5166pub fn describe_create_cluster_replica(
5167    _: &StatementContext,
5168    _: CreateClusterReplicaStatement<Aug>,
5169) -> Result<StatementDesc, PlanError> {
5170    Ok(StatementDesc::new(None))
5171}
5172
5173pub fn plan_create_cluster_replica(
5174    scx: &StatementContext,
5175    CreateClusterReplicaStatement {
5176        definition: ReplicaDefinition { name, options },
5177        of_cluster,
5178    }: CreateClusterReplicaStatement<Aug>,
5179) -> Result<Plan, PlanError> {
5180    let cluster = scx
5181        .catalog
5182        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5183    let current_replica_count = cluster.replica_ids().iter().count();
5184    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5185        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5186        return Err(PlanError::CreateReplicaFailStorageObjects {
5187            current_replica_count,
5188            internal_replica_count,
5189            hypothetical_replica_count: current_replica_count + 1,
5190        });
5191    }
5192
5193    let config = plan_replica_config(scx, options)?;
5194
5195    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5196        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5197            return Err(PlanError::MangedReplicaName(name.into_string()));
5198        }
5199    } else {
5200        ensure_cluster_is_not_managed(scx, cluster.id())?;
5201    }
5202
5203    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5204        name: normalize::ident(name),
5205        cluster_id: cluster.id(),
5206        config,
5207    }))
5208}
5209
5210pub fn describe_create_secret(
5211    _: &StatementContext,
5212    _: CreateSecretStatement<Aug>,
5213) -> Result<StatementDesc, PlanError> {
5214    Ok(StatementDesc::new(None))
5215}
5216
5217pub fn plan_create_secret(
5218    scx: &StatementContext,
5219    stmt: CreateSecretStatement<Aug>,
5220) -> Result<Plan, PlanError> {
5221    let CreateSecretStatement {
5222        name,
5223        if_not_exists,
5224        value,
5225    } = &stmt;
5226
5227    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5228    let mut create_sql_statement = stmt.clone();
5229    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5230    let create_sql =
5231        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5232    let secret_as = query::plan_secret_as(scx, value.clone())?;
5233
5234    let secret = Secret {
5235        create_sql,
5236        secret_as,
5237    };
5238
5239    Ok(Plan::CreateSecret(CreateSecretPlan {
5240        name,
5241        secret,
5242        if_not_exists: *if_not_exists,
5243    }))
5244}
5245
5246pub fn describe_create_connection(
5247    _: &StatementContext,
5248    _: CreateConnectionStatement<Aug>,
5249) -> Result<StatementDesc, PlanError> {
5250    Ok(StatementDesc::new(None))
5251}
5252
5253generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5254
5255pub fn plan_create_connection(
5256    scx: &StatementContext,
5257    mut stmt: CreateConnectionStatement<Aug>,
5258) -> Result<Plan, PlanError> {
5259    let CreateConnectionStatement {
5260        name,
5261        connection_type,
5262        values,
5263        if_not_exists,
5264        with_options,
5265    } = stmt.clone();
5266    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5267    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5268    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5269
5270    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5271    if options.validate.is_some() {
5272        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5273    }
5274    let validate = match options.validate {
5275        Some(val) => val,
5276        None => {
5277            scx.catalog
5278                .system_vars()
5279                .enable_default_connection_validation()
5280                && details.to_connection().validate_by_default()
5281        }
5282    };
5283
5284    // Check for an object in the catalog with this same name
5285    let full_name = scx.catalog.resolve_full_name(&name);
5286    let partial_name = PartialItemName::from(full_name.clone());
5287    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5288        return Err(PlanError::ItemAlreadyExists {
5289            name: full_name.to_string(),
5290            item_type: item.item_type(),
5291        });
5292    }
5293
5294    // For SSH connections, overwrite the public key options based on the
5295    // connection details, in case we generated new keys during planning.
5296    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5297        stmt.values.retain(|v| {
5298            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5299        });
5300        stmt.values.push(ConnectionOption {
5301            name: ConnectionOptionName::PublicKey1,
5302            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5303        });
5304        stmt.values.push(ConnectionOption {
5305            name: ConnectionOptionName::PublicKey2,
5306            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5307        });
5308    }
5309    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5310
5311    let plan = CreateConnectionPlan {
5312        name,
5313        if_not_exists,
5314        connection: crate::plan::Connection {
5315            create_sql,
5316            details,
5317        },
5318        validate,
5319    };
5320    Ok(Plan::CreateConnection(plan))
5321}
5322
5323fn plan_drop_database(
5324    scx: &StatementContext,
5325    if_exists: bool,
5326    name: &UnresolvedDatabaseName,
5327    cascade: bool,
5328) -> Result<Option<DatabaseId>, PlanError> {
5329    Ok(match resolve_database(scx, name, if_exists)? {
5330        Some(database) => {
5331            if !cascade && database.has_schemas() {
5332                sql_bail!(
5333                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5334                    name,
5335                );
5336            }
5337            Some(database.id())
5338        }
5339        None => None,
5340    })
5341}
5342
5343pub fn describe_drop_objects(
5344    _: &StatementContext,
5345    _: DropObjectsStatement,
5346) -> Result<StatementDesc, PlanError> {
5347    Ok(StatementDesc::new(None))
5348}
5349
5350pub fn plan_drop_objects(
5351    scx: &mut StatementContext,
5352    DropObjectsStatement {
5353        object_type,
5354        if_exists,
5355        names,
5356        cascade,
5357    }: DropObjectsStatement,
5358) -> Result<Plan, PlanError> {
5359    assert_ne!(
5360        object_type,
5361        mz_sql_parser::ast::ObjectType::Func,
5362        "rejected in parser"
5363    );
5364    let object_type = object_type.into();
5365
5366    let mut referenced_ids = Vec::new();
5367    for name in names {
5368        let id = match &name {
5369            UnresolvedObjectName::Cluster(name) => {
5370                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5371            }
5372            UnresolvedObjectName::ClusterReplica(name) => {
5373                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5374            }
5375            UnresolvedObjectName::Database(name) => {
5376                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5377            }
5378            UnresolvedObjectName::Schema(name) => {
5379                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5380            }
5381            UnresolvedObjectName::Role(name) => {
5382                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5383            }
5384            UnresolvedObjectName::Item(name) => {
5385                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5386                    .map(ObjectId::Item)
5387            }
5388            UnresolvedObjectName::NetworkPolicy(name) => {
5389                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5390            }
5391        };
5392        match id {
5393            Some(id) => referenced_ids.push(id),
5394            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5395                name: name.to_ast_string_simple(),
5396                object_type,
5397            }),
5398        }
5399    }
5400    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5401
5402    Ok(Plan::DropObjects(DropObjectsPlan {
5403        referenced_ids,
5404        drop_ids,
5405        object_type,
5406    }))
5407}
5408
5409fn plan_drop_schema(
5410    scx: &StatementContext,
5411    if_exists: bool,
5412    name: &UnresolvedSchemaName,
5413    cascade: bool,
5414) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5415    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5416        Some((database_spec, schema_spec)) => {
5417            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5418                sql_bail!(
5419                    "cannot drop schema {name} because it is required by the database system",
5420                );
5421            }
5422            if let SchemaSpecifier::Temporary = schema_spec {
5423                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5424            }
5425            let schema = scx.get_schema(&database_spec, &schema_spec);
5426            if !cascade && schema.has_items() {
5427                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5428                sql_bail!(
5429                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5430                    full_schema_name
5431                );
5432            }
5433            Some((database_spec, schema_spec))
5434        }
5435        None => None,
5436    })
5437}
5438
5439fn plan_drop_role(
5440    scx: &StatementContext,
5441    if_exists: bool,
5442    name: &Ident,
5443) -> Result<Option<RoleId>, PlanError> {
5444    match scx.catalog.resolve_role(name.as_str()) {
5445        Ok(role) => {
5446            let id = role.id();
5447            if &id == scx.catalog.active_role_id() {
5448                sql_bail!("current role cannot be dropped");
5449            }
5450            for role in scx.catalog.get_roles() {
5451                for (member_id, grantor_id) in role.membership() {
5452                    if &id == grantor_id {
5453                        let member_role = scx.catalog.get_role(member_id);
5454                        sql_bail!(
5455                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5456                            name.as_str(),
5457                            role.name(),
5458                            member_role.name()
5459                        );
5460                    }
5461                }
5462            }
5463            Ok(Some(role.id()))
5464        }
5465        Err(_) if if_exists => Ok(None),
5466        Err(e) => Err(e.into()),
5467    }
5468}
5469
5470fn plan_drop_cluster(
5471    scx: &StatementContext,
5472    if_exists: bool,
5473    name: &Ident,
5474    cascade: bool,
5475) -> Result<Option<ClusterId>, PlanError> {
5476    Ok(match resolve_cluster(scx, name, if_exists)? {
5477        Some(cluster) => {
5478            if !cascade && !cluster.bound_objects().is_empty() {
5479                return Err(PlanError::DependentObjectsStillExist {
5480                    object_type: "cluster".to_string(),
5481                    object_name: cluster.name().to_string(),
5482                    dependents: Vec::new(),
5483                });
5484            }
5485            Some(cluster.id())
5486        }
5487        None => None,
5488    })
5489}
5490
5491fn plan_drop_network_policy(
5492    scx: &StatementContext,
5493    if_exists: bool,
5494    name: &Ident,
5495) -> Result<Option<NetworkPolicyId>, PlanError> {
5496    match scx.catalog.resolve_network_policy(name.as_str()) {
5497        Ok(policy) => {
5498            // TODO(network_policy): When we support role based network policies, check if any role
5499            // currently has the specified policy set.
5500            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5501                Err(PlanError::NetworkPolicyInUse)
5502            } else {
5503                Ok(Some(policy.id()))
5504            }
5505        }
5506        Err(_) if if_exists => Ok(None),
5507        Err(e) => Err(e.into()),
5508    }
5509}
5510
5511/// Returns `true` if the cluster has any object that requires a single replica.
5512/// Returns `false` if the cluster has no objects.
5513fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5514    // If this feature is enabled then all objects support multiple-replicas
5515    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5516        false
5517    } else {
5518        // Othewise we check for the existence of sources or sinks
5519        cluster.bound_objects().iter().any(|id| {
5520            let item = scx.catalog.get_item(id);
5521            matches!(
5522                item.item_type(),
5523                CatalogItemType::Sink | CatalogItemType::Source
5524            )
5525        })
5526    }
5527}
5528
5529fn plan_drop_cluster_replica(
5530    scx: &StatementContext,
5531    if_exists: bool,
5532    name: &QualifiedReplica,
5533) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5534    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5535    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5536}
5537
5538/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5539fn plan_drop_item(
5540    scx: &StatementContext,
5541    object_type: ObjectType,
5542    if_exists: bool,
5543    name: UnresolvedItemName,
5544    cascade: bool,
5545) -> Result<Option<CatalogItemId>, PlanError> {
5546    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5547        Ok(r) => r,
5548        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5549        Err(PlanError::MismatchedObjectType {
5550            name,
5551            is_type: ObjectType::MaterializedView,
5552            expected_type: ObjectType::View,
5553        }) => {
5554            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5555        }
5556        e => e?,
5557    };
5558
5559    Ok(match resolved {
5560        Some(catalog_item) => {
5561            if catalog_item.id().is_system() {
5562                sql_bail!(
5563                    "cannot drop {} {} because it is required by the database system",
5564                    catalog_item.item_type(),
5565                    scx.catalog.minimal_qualification(catalog_item.name()),
5566                );
5567            }
5568
5569            if !cascade {
5570                for id in catalog_item.used_by() {
5571                    let dep = scx.catalog.get_item(id);
5572                    if dependency_prevents_drop(object_type, dep) {
5573                        return Err(PlanError::DependentObjectsStillExist {
5574                            object_type: catalog_item.item_type().to_string(),
5575                            object_name: scx
5576                                .catalog
5577                                .minimal_qualification(catalog_item.name())
5578                                .to_string(),
5579                            dependents: vec![(
5580                                dep.item_type().to_string(),
5581                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5582                            )],
5583                        });
5584                    }
5585                }
5586                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5587                //  relies on entry. Unfortunately, we don't have that information readily available.
5588            }
5589            Some(catalog_item.id())
5590        }
5591        None => None,
5592    })
5593}
5594
5595/// Does the dependency `dep` prevent a drop of a non-cascade query?
5596fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5597    match object_type {
5598        ObjectType::Type => true,
5599        _ => match dep.item_type() {
5600            CatalogItemType::Func
5601            | CatalogItemType::Table
5602            | CatalogItemType::Source
5603            | CatalogItemType::View
5604            | CatalogItemType::MaterializedView
5605            | CatalogItemType::Sink
5606            | CatalogItemType::Type
5607            | CatalogItemType::Secret
5608            | CatalogItemType::Connection
5609            | CatalogItemType::ContinualTask => true,
5610            CatalogItemType::Index => false,
5611        },
5612    }
5613}
5614
5615pub fn describe_alter_index_options(
5616    _: &StatementContext,
5617    _: AlterIndexStatement<Aug>,
5618) -> Result<StatementDesc, PlanError> {
5619    Ok(StatementDesc::new(None))
5620}
5621
5622pub fn describe_drop_owned(
5623    _: &StatementContext,
5624    _: DropOwnedStatement<Aug>,
5625) -> Result<StatementDesc, PlanError> {
5626    Ok(StatementDesc::new(None))
5627}
5628
5629pub fn plan_drop_owned(
5630    scx: &StatementContext,
5631    drop: DropOwnedStatement<Aug>,
5632) -> Result<Plan, PlanError> {
5633    let cascade = drop.cascade();
5634    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5635    let mut drop_ids = Vec::new();
5636    let mut privilege_revokes = Vec::new();
5637    let mut default_privilege_revokes = Vec::new();
5638
5639    fn update_privilege_revokes(
5640        object_id: SystemObjectId,
5641        privileges: &PrivilegeMap,
5642        role_ids: &BTreeSet<RoleId>,
5643        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5644    ) {
5645        privilege_revokes.extend(iter::zip(
5646            iter::repeat(object_id),
5647            privileges
5648                .all_values()
5649                .filter(|privilege| role_ids.contains(&privilege.grantee))
5650                .cloned(),
5651        ));
5652    }
5653
5654    // Replicas
5655    for replica in scx.catalog.get_cluster_replicas() {
5656        if role_ids.contains(&replica.owner_id()) {
5657            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5658        }
5659    }
5660
5661    // Clusters
5662    for cluster in scx.catalog.get_clusters() {
5663        if role_ids.contains(&cluster.owner_id()) {
5664            // Note: CASCADE is not required for replicas.
5665            if !cascade {
5666                let non_owned_bound_objects: Vec<_> = cluster
5667                    .bound_objects()
5668                    .into_iter()
5669                    .map(|item_id| scx.catalog.get_item(item_id))
5670                    .filter(|item| !role_ids.contains(&item.owner_id()))
5671                    .collect();
5672                if !non_owned_bound_objects.is_empty() {
5673                    let names: Vec<_> = non_owned_bound_objects
5674                        .into_iter()
5675                        .map(|item| {
5676                            (
5677                                item.item_type().to_string(),
5678                                scx.catalog.resolve_full_name(item.name()).to_string(),
5679                            )
5680                        })
5681                        .collect();
5682                    return Err(PlanError::DependentObjectsStillExist {
5683                        object_type: "cluster".to_string(),
5684                        object_name: cluster.name().to_string(),
5685                        dependents: names,
5686                    });
5687                }
5688            }
5689            drop_ids.push(cluster.id().into());
5690        }
5691        update_privilege_revokes(
5692            SystemObjectId::Object(cluster.id().into()),
5693            cluster.privileges(),
5694            &role_ids,
5695            &mut privilege_revokes,
5696        );
5697    }
5698
5699    // Items
5700    for item in scx.catalog.get_items() {
5701        if role_ids.contains(&item.owner_id()) {
5702            if !cascade {
5703                // Checks if any items still depend on this one, returning an error if so.
5704                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5705                    let non_owned_dependencies: Vec<_> = used_by
5706                        .into_iter()
5707                        .map(|item_id| scx.catalog.get_item(item_id))
5708                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5709                        .filter(|item| !role_ids.contains(&item.owner_id()))
5710                        .collect();
5711                    if !non_owned_dependencies.is_empty() {
5712                        let names: Vec<_> = non_owned_dependencies
5713                            .into_iter()
5714                            .map(|item| {
5715                                let item_typ = item.item_type().to_string();
5716                                let item_name =
5717                                    scx.catalog.resolve_full_name(item.name()).to_string();
5718                                (item_typ, item_name)
5719                            })
5720                            .collect();
5721                        Err(PlanError::DependentObjectsStillExist {
5722                            object_type: item.item_type().to_string(),
5723                            object_name: scx
5724                                .catalog
5725                                .resolve_full_name(item.name())
5726                                .to_string()
5727                                .to_string(),
5728                            dependents: names,
5729                        })
5730                    } else {
5731                        Ok(())
5732                    }
5733                };
5734
5735                // When this item gets dropped it will also drop its progress source, so we need to
5736                // check the users of those.
5737                if let Some(id) = item.progress_id() {
5738                    let progress_item = scx.catalog.get_item(&id);
5739                    check_if_dependents_exist(progress_item.used_by())?;
5740                }
5741                check_if_dependents_exist(item.used_by())?;
5742            }
5743            drop_ids.push(item.id().into());
5744        }
5745        update_privilege_revokes(
5746            SystemObjectId::Object(item.id().into()),
5747            item.privileges(),
5748            &role_ids,
5749            &mut privilege_revokes,
5750        );
5751    }
5752
5753    // Schemas
5754    for schema in scx.catalog.get_schemas() {
5755        if !schema.id().is_temporary() {
5756            if role_ids.contains(&schema.owner_id()) {
5757                if !cascade {
5758                    let non_owned_dependencies: Vec<_> = schema
5759                        .item_ids()
5760                        .map(|item_id| scx.catalog.get_item(&item_id))
5761                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5762                        .filter(|item| !role_ids.contains(&item.owner_id()))
5763                        .collect();
5764                    if !non_owned_dependencies.is_empty() {
5765                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5766                        sql_bail!(
5767                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5768                            full_schema_name.to_string().quoted()
5769                        );
5770                    }
5771                }
5772                drop_ids.push((*schema.database(), *schema.id()).into())
5773            }
5774            update_privilege_revokes(
5775                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5776                schema.privileges(),
5777                &role_ids,
5778                &mut privilege_revokes,
5779            );
5780        }
5781    }
5782
5783    // Databases
5784    for database in scx.catalog.get_databases() {
5785        if role_ids.contains(&database.owner_id()) {
5786            if !cascade {
5787                let non_owned_schemas: Vec<_> = database
5788                    .schemas()
5789                    .into_iter()
5790                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5791                    .collect();
5792                if !non_owned_schemas.is_empty() {
5793                    sql_bail!(
5794                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5795                        database.name().quoted(),
5796                    );
5797                }
5798            }
5799            drop_ids.push(database.id().into());
5800        }
5801        update_privilege_revokes(
5802            SystemObjectId::Object(database.id().into()),
5803            database.privileges(),
5804            &role_ids,
5805            &mut privilege_revokes,
5806        );
5807    }
5808
5809    // System
5810    update_privilege_revokes(
5811        SystemObjectId::System,
5812        scx.catalog.get_system_privileges(),
5813        &role_ids,
5814        &mut privilege_revokes,
5815    );
5816
5817    for (default_privilege_object, default_privilege_acl_items) in
5818        scx.catalog.get_default_privileges()
5819    {
5820        for default_privilege_acl_item in default_privilege_acl_items {
5821            if role_ids.contains(&default_privilege_object.role_id)
5822                || role_ids.contains(&default_privilege_acl_item.grantee)
5823            {
5824                default_privilege_revokes.push((
5825                    default_privilege_object.clone(),
5826                    default_privilege_acl_item.clone(),
5827                ));
5828            }
5829        }
5830    }
5831
5832    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5833
5834    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5835    if !system_ids.is_empty() {
5836        let mut owners = system_ids
5837            .into_iter()
5838            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5839            .collect::<BTreeSet<_>>()
5840            .into_iter()
5841            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5842        sql_bail!(
5843            "cannot drop objects owned by role {} because they are required by the database system",
5844            owners.join(", "),
5845        );
5846    }
5847
5848    Ok(Plan::DropOwned(DropOwnedPlan {
5849        role_ids: role_ids.into_iter().collect(),
5850        drop_ids,
5851        privilege_revokes,
5852        default_privilege_revokes,
5853    }))
5854}
5855
5856fn plan_retain_history_option(
5857    scx: &StatementContext,
5858    retain_history: Option<OptionalDuration>,
5859) -> Result<Option<CompactionWindow>, PlanError> {
5860    if let Some(OptionalDuration(lcw)) = retain_history {
5861        Ok(Some(plan_retain_history(scx, lcw)?))
5862    } else {
5863        Ok(None)
5864    }
5865}
5866
5867// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5868// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5869// already converts the zero duration into `None`. This function must not be called in the `RESET
5870// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5871// `None`.
5872fn plan_retain_history(
5873    scx: &StatementContext,
5874    lcw: Option<Duration>,
5875) -> Result<CompactionWindow, PlanError> {
5876    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5877    match lcw {
5878        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5879        // disable compaction), and should never occur here. Furthermore, some things actually do
5880        // break when this is set to real zero:
5881        // https://github.com/MaterializeInc/database-issues/issues/3798.
5882        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5883            option_name: "RETAIN HISTORY".to_string(),
5884            err: Box::new(PlanError::Unstructured(
5885                "internal error: unexpectedly zero".to_string(),
5886            )),
5887        }),
5888        Some(duration) => {
5889            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5890            // should only be possible during testing).
5891            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5892                && scx
5893                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5894                    .is_err()
5895            {
5896                return Err(PlanError::RetainHistoryLow {
5897                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5898                });
5899            }
5900            Ok(duration.try_into()?)
5901        }
5902        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5903        // to be a bad choice, so prevent it.
5904        None => {
5905            if scx
5906                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5907                .is_err()
5908            {
5909                Err(PlanError::RetainHistoryRequired)
5910            } else {
5911                Ok(CompactionWindow::DisableCompaction)
5912            }
5913        }
5914    }
5915}
5916
5917generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5918
5919fn plan_index_options(
5920    scx: &StatementContext,
5921    with_opts: Vec<IndexOption<Aug>>,
5922) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5923    if !with_opts.is_empty() {
5924        // Index options are not durable.
5925        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5926    }
5927
5928    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5929
5930    let mut out = Vec::with_capacity(1);
5931    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5932        out.push(crate::plan::IndexOption::RetainHistory(cw));
5933    }
5934    Ok(out)
5935}
5936
5937generate_extracted_config!(
5938    TableOption,
5939    (PartitionBy, Vec<Ident>),
5940    (RetainHistory, OptionalDuration),
5941    (RedactedTest, String)
5942);
5943
5944fn plan_table_options(
5945    scx: &StatementContext,
5946    desc: &RelationDesc,
5947    with_opts: Vec<TableOption<Aug>>,
5948) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5949    let TableOptionExtracted {
5950        partition_by,
5951        retain_history,
5952        redacted_test,
5953        ..
5954    }: TableOptionExtracted = with_opts.try_into()?;
5955
5956    if let Some(partition_by) = partition_by {
5957        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5958        check_partition_by(desc, partition_by)?;
5959    }
5960
5961    if redacted_test.is_some() {
5962        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5963    }
5964
5965    let mut out = Vec::with_capacity(1);
5966    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5967        out.push(crate::plan::TableOption::RetainHistory(cw));
5968    }
5969    Ok(out)
5970}
5971
5972pub fn plan_alter_index_options(
5973    scx: &mut StatementContext,
5974    AlterIndexStatement {
5975        index_name,
5976        if_exists,
5977        action,
5978    }: AlterIndexStatement<Aug>,
5979) -> Result<Plan, PlanError> {
5980    let object_type = ObjectType::Index;
5981    match action {
5982        AlterIndexAction::ResetOptions(options) => {
5983            let mut options = options.into_iter();
5984            if let Some(opt) = options.next() {
5985                match opt {
5986                    IndexOptionName::RetainHistory => {
5987                        if options.next().is_some() {
5988                            sql_bail!("RETAIN HISTORY must be only option");
5989                        }
5990                        return alter_retain_history(
5991                            scx,
5992                            object_type,
5993                            if_exists,
5994                            UnresolvedObjectName::Item(index_name),
5995                            None,
5996                        );
5997                    }
5998                }
5999            }
6000            sql_bail!("expected option");
6001        }
6002        AlterIndexAction::SetOptions(options) => {
6003            let mut options = options.into_iter();
6004            if let Some(opt) = options.next() {
6005                match opt.name {
6006                    IndexOptionName::RetainHistory => {
6007                        if options.next().is_some() {
6008                            sql_bail!("RETAIN HISTORY must be only option");
6009                        }
6010                        return alter_retain_history(
6011                            scx,
6012                            object_type,
6013                            if_exists,
6014                            UnresolvedObjectName::Item(index_name),
6015                            opt.value,
6016                        );
6017                    }
6018                }
6019            }
6020            sql_bail!("expected option");
6021        }
6022    }
6023}
6024
6025pub fn describe_alter_cluster_set_options(
6026    _: &StatementContext,
6027    _: AlterClusterStatement<Aug>,
6028) -> Result<StatementDesc, PlanError> {
6029    Ok(StatementDesc::new(None))
6030}
6031
6032pub fn plan_alter_cluster(
6033    scx: &mut StatementContext,
6034    AlterClusterStatement {
6035        name,
6036        action,
6037        if_exists,
6038    }: AlterClusterStatement<Aug>,
6039) -> Result<Plan, PlanError> {
6040    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6041        Some(entry) => entry,
6042        None => {
6043            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6044                name: name.to_ast_string_simple(),
6045                object_type: ObjectType::Cluster,
6046            });
6047
6048            return Ok(Plan::AlterNoop(AlterNoopPlan {
6049                object_type: ObjectType::Cluster,
6050            }));
6051        }
6052    };
6053
6054    let mut options: PlanClusterOption = Default::default();
6055    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6056
6057    match action {
6058        AlterClusterAction::SetOptions {
6059            options: set_options,
6060            with_options,
6061        } => {
6062            let ClusterOptionExtracted {
6063                availability_zones,
6064                introspection_debugging,
6065                introspection_interval,
6066                managed,
6067                replicas: replica_defs,
6068                replication_factor,
6069                seen: _,
6070                size,
6071                disk,
6072                schedule,
6073                workload_class,
6074            }: ClusterOptionExtracted = set_options.try_into()?;
6075
6076            if !scx.catalog.active_role_id().is_system() {
6077                if workload_class.is_some() {
6078                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6079                }
6080            }
6081
6082            match managed.unwrap_or_else(|| cluster.is_managed()) {
6083                true => {
6084                    let alter_strategy_extracted =
6085                        ClusterAlterOptionExtracted::try_from(with_options)?;
6086                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6087
6088                    match alter_strategy {
6089                        AlterClusterPlanStrategy::None => {}
6090                        _ => {
6091                            scx.require_feature_flag(
6092                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6093                            )?;
6094                        }
6095                    }
6096
6097                    if replica_defs.is_some() {
6098                        sql_bail!("REPLICAS not supported for managed clusters");
6099                    }
6100                    if schedule.is_some()
6101                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6102                    {
6103                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6104                    }
6105
6106                    if let Some(replication_factor) = replication_factor {
6107                        if schedule.is_some()
6108                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6109                        {
6110                            sql_bail!(
6111                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6112                            );
6113                        }
6114                        if let Some(current_schedule) = cluster.schedule() {
6115                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6116                                sql_bail!(
6117                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6118                                );
6119                            }
6120                        }
6121
6122                        let internal_replica_count =
6123                            cluster.replicas().iter().filter(|r| r.internal()).count();
6124                        let hypothetical_replica_count =
6125                            internal_replica_count + usize::cast_from(replication_factor);
6126
6127                        // Total number of replicas running is internal replicas
6128                        // + replication factor.
6129                        if contains_single_replica_objects(scx, cluster)
6130                            && hypothetical_replica_count > 1
6131                        {
6132                            return Err(PlanError::CreateReplicaFailStorageObjects {
6133                                current_replica_count: cluster.replica_ids().iter().count(),
6134                                internal_replica_count,
6135                                hypothetical_replica_count,
6136                            });
6137                        }
6138                    } else if alter_strategy.is_some() {
6139                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6140                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6141                        // just fail.
6142                        let internal_replica_count =
6143                            cluster.replicas().iter().filter(|r| r.internal()).count();
6144                        let hypothetical_replica_count = internal_replica_count * 2;
6145                        if contains_single_replica_objects(scx, cluster) {
6146                            return Err(PlanError::CreateReplicaFailStorageObjects {
6147                                current_replica_count: cluster.replica_ids().iter().count(),
6148                                internal_replica_count,
6149                                hypothetical_replica_count,
6150                            });
6151                        }
6152                    }
6153                }
6154                false => {
6155                    if !alter_strategy.is_none() {
6156                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6157                    }
6158                    if availability_zones.is_some() {
6159                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6160                    }
6161                    if replication_factor.is_some() {
6162                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6163                    }
6164                    if introspection_debugging.is_some() {
6165                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6166                    }
6167                    if introspection_interval.is_some() {
6168                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6169                    }
6170                    if size.is_some() {
6171                        sql_bail!("SIZE not supported for unmanaged clusters");
6172                    }
6173                    if disk.is_some() {
6174                        sql_bail!("DISK not supported for unmanaged clusters");
6175                    }
6176                    if schedule.is_some()
6177                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6178                    {
6179                        sql_bail!(
6180                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6181                        );
6182                    }
6183                    if let Some(current_schedule) = cluster.schedule() {
6184                        if !matches!(current_schedule, ClusterSchedule::Manual)
6185                            && schedule.is_none()
6186                        {
6187                            sql_bail!(
6188                                "when switching a cluster to unmanaged, if the managed \
6189                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6190                                explicitly set the SCHEDULE to MANUAL"
6191                            );
6192                        }
6193                    }
6194                }
6195            }
6196
6197            let mut replicas = vec![];
6198            for ReplicaDefinition { name, options } in
6199                replica_defs.into_iter().flat_map(Vec::into_iter)
6200            {
6201                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6202            }
6203
6204            if let Some(managed) = managed {
6205                options.managed = AlterOptionParameter::Set(managed);
6206            }
6207            if let Some(replication_factor) = replication_factor {
6208                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6209            }
6210            if let Some(size) = &size {
6211                options.size = AlterOptionParameter::Set(size.clone());
6212            }
6213            if let Some(availability_zones) = availability_zones {
6214                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6215            }
6216            if let Some(introspection_debugging) = introspection_debugging {
6217                options.introspection_debugging =
6218                    AlterOptionParameter::Set(introspection_debugging);
6219            }
6220            if let Some(introspection_interval) = introspection_interval {
6221                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6222            }
6223            if disk.is_some() {
6224                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6225                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6226                // we'll be able to remove the `DISK` option entirely.
6227                let size = size.as_deref().unwrap_or_else(|| {
6228                    cluster.managed_size().expect("cluster known to be managed")
6229                });
6230                if scx.catalog.is_cluster_size_cc(size) {
6231                    sql_bail!(
6232                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6233                    );
6234                }
6235
6236                scx.catalog
6237                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6238            }
6239            if !replicas.is_empty() {
6240                options.replicas = AlterOptionParameter::Set(replicas);
6241            }
6242            if let Some(schedule) = schedule {
6243                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6244            }
6245            if let Some(workload_class) = workload_class {
6246                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6247            }
6248        }
6249        AlterClusterAction::ResetOptions(reset_options) => {
6250            use AlterOptionParameter::Reset;
6251            use ClusterOptionName::*;
6252
6253            if !scx.catalog.active_role_id().is_system() {
6254                if reset_options.contains(&WorkloadClass) {
6255                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6256                }
6257            }
6258
6259            for option in reset_options {
6260                match option {
6261                    AvailabilityZones => options.availability_zones = Reset,
6262                    Disk => scx
6263                        .catalog
6264                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6265                    IntrospectionInterval => options.introspection_interval = Reset,
6266                    IntrospectionDebugging => options.introspection_debugging = Reset,
6267                    Managed => options.managed = Reset,
6268                    Replicas => options.replicas = Reset,
6269                    ReplicationFactor => options.replication_factor = Reset,
6270                    Size => options.size = Reset,
6271                    Schedule => options.schedule = Reset,
6272                    WorkloadClass => options.workload_class = Reset,
6273                }
6274            }
6275        }
6276    }
6277    Ok(Plan::AlterCluster(AlterClusterPlan {
6278        id: cluster.id(),
6279        name: cluster.name().to_string(),
6280        options,
6281        strategy: alter_strategy,
6282    }))
6283}
6284
6285pub fn describe_alter_set_cluster(
6286    _: &StatementContext,
6287    _: AlterSetClusterStatement<Aug>,
6288) -> Result<StatementDesc, PlanError> {
6289    Ok(StatementDesc::new(None))
6290}
6291
6292pub fn plan_alter_item_set_cluster(
6293    scx: &StatementContext,
6294    AlterSetClusterStatement {
6295        if_exists,
6296        set_cluster: in_cluster_name,
6297        name,
6298        object_type,
6299    }: AlterSetClusterStatement<Aug>,
6300) -> Result<Plan, PlanError> {
6301    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6302
6303    let object_type = object_type.into();
6304
6305    // Prevent access to `SET CLUSTER` for unsupported objects.
6306    match object_type {
6307        ObjectType::MaterializedView => {}
6308        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6309            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6310        }
6311        _ => {
6312            bail_never_supported!(
6313                format!("ALTER {object_type} SET CLUSTER"),
6314                "sql/alter-set-cluster/",
6315                format!("{object_type} has no associated cluster")
6316            )
6317        }
6318    }
6319
6320    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6321
6322    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6323        Some(entry) => {
6324            let current_cluster = entry.cluster_id();
6325            let Some(current_cluster) = current_cluster else {
6326                sql_bail!("No cluster associated with {name}");
6327            };
6328
6329            if current_cluster == in_cluster.id() {
6330                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6331            } else {
6332                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6333                    id: entry.id(),
6334                    set_cluster: in_cluster.id(),
6335                }))
6336            }
6337        }
6338        None => {
6339            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6340                name: name.to_ast_string_simple(),
6341                object_type,
6342            });
6343
6344            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6345        }
6346    }
6347}
6348
6349pub fn describe_alter_object_rename(
6350    _: &StatementContext,
6351    _: AlterObjectRenameStatement,
6352) -> Result<StatementDesc, PlanError> {
6353    Ok(StatementDesc::new(None))
6354}
6355
6356pub fn plan_alter_object_rename(
6357    scx: &mut StatementContext,
6358    AlterObjectRenameStatement {
6359        name,
6360        object_type,
6361        to_item_name,
6362        if_exists,
6363    }: AlterObjectRenameStatement,
6364) -> Result<Plan, PlanError> {
6365    let object_type = object_type.into();
6366    match (object_type, name) {
6367        (
6368            ObjectType::View
6369            | ObjectType::MaterializedView
6370            | ObjectType::Table
6371            | ObjectType::Source
6372            | ObjectType::Index
6373            | ObjectType::Sink
6374            | ObjectType::Secret
6375            | ObjectType::Connection,
6376            UnresolvedObjectName::Item(name),
6377        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6378        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6379            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6380        }
6381        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6382            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6383        }
6384        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6385            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6386        }
6387        (object_type, name) => {
6388            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6389        }
6390    }
6391}
6392
6393pub fn plan_alter_schema_rename(
6394    scx: &mut StatementContext,
6395    name: UnresolvedSchemaName,
6396    to_schema_name: Ident,
6397    if_exists: bool,
6398) -> Result<Plan, PlanError> {
6399    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6400        let object_type = ObjectType::Schema;
6401        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6402            name: name.to_ast_string_simple(),
6403            object_type,
6404        });
6405        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6406    };
6407
6408    // Make sure the name is unique.
6409    if scx
6410        .resolve_schema_in_database(&db_spec, &to_schema_name)
6411        .is_ok()
6412    {
6413        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6414            to_schema_name.clone().into_string(),
6415        )));
6416    }
6417
6418    // Prevent users from renaming system related schemas.
6419    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6420    if schema.id().is_system() {
6421        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6422    }
6423
6424    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6425        cur_schema_spec: (db_spec, schema_spec),
6426        new_schema_name: to_schema_name.into_string(),
6427    }))
6428}
6429
6430pub fn plan_alter_schema_swap<F>(
6431    scx: &mut StatementContext,
6432    name_a: UnresolvedSchemaName,
6433    name_b: Ident,
6434    gen_temp_suffix: F,
6435) -> Result<Plan, PlanError>
6436where
6437    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6438{
6439    let schema_a = scx.resolve_schema(name_a.clone())?;
6440
6441    let db_spec = schema_a.database().clone();
6442    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6443        sql_bail!("cannot swap schemas that are in the ambient database");
6444    };
6445    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6446
6447    // We cannot swap system schemas.
6448    if schema_a.id().is_system() || schema_b.id().is_system() {
6449        bail_never_supported!("swapping a system schema".to_string())
6450    }
6451
6452    // Generate a temporary name we can swap schema_a to.
6453    //
6454    // 'check' returns if the temp schema name would be valid.
6455    let check = |temp_suffix: &str| {
6456        let mut temp_name = ident!("mz_schema_swap_");
6457        temp_name.append_lossy(temp_suffix);
6458        scx.resolve_schema_in_database(&db_spec, &temp_name)
6459            .is_err()
6460    };
6461    let temp_suffix = gen_temp_suffix(&check)?;
6462    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6463
6464    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6465        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6466        schema_a_name: schema_a.name().schema.to_string(),
6467        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6468        schema_b_name: schema_b.name().schema.to_string(),
6469        name_temp,
6470    }))
6471}
6472
6473pub fn plan_alter_item_rename(
6474    scx: &mut StatementContext,
6475    object_type: ObjectType,
6476    name: UnresolvedItemName,
6477    to_item_name: Ident,
6478    if_exists: bool,
6479) -> Result<Plan, PlanError> {
6480    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6481        Ok(r) => r,
6482        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6483        Err(PlanError::MismatchedObjectType {
6484            name,
6485            is_type: ObjectType::MaterializedView,
6486            expected_type: ObjectType::View,
6487        }) => {
6488            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6489        }
6490        e => e?,
6491    };
6492
6493    match resolved {
6494        Some(entry) => {
6495            let full_name = scx.catalog.resolve_full_name(entry.name());
6496            let item_type = entry.item_type();
6497
6498            let proposed_name = QualifiedItemName {
6499                qualifiers: entry.name().qualifiers.clone(),
6500                item: to_item_name.clone().into_string(),
6501            };
6502
6503            // For PostgreSQL compatibility, items and types cannot have
6504            // overlapping names in a variety of situations. See the comment on
6505            // `CatalogItemType::conflicts_with_type` for details.
6506            let conflicting_type_exists;
6507            let conflicting_item_exists;
6508            if item_type == CatalogItemType::Type {
6509                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6510                conflicting_item_exists = scx
6511                    .catalog
6512                    .get_item_by_name(&proposed_name)
6513                    .map(|item| item.item_type().conflicts_with_type())
6514                    .unwrap_or(false);
6515            } else {
6516                conflicting_type_exists = item_type.conflicts_with_type()
6517                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6518                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6519            };
6520            if conflicting_type_exists || conflicting_item_exists {
6521                sql_bail!("catalog item '{}' already exists", to_item_name);
6522            }
6523
6524            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6525                id: entry.id(),
6526                current_full_name: full_name,
6527                to_name: normalize::ident(to_item_name),
6528                object_type,
6529            }))
6530        }
6531        None => {
6532            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6533                name: name.to_ast_string_simple(),
6534                object_type,
6535            });
6536
6537            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6538        }
6539    }
6540}
6541
6542pub fn plan_alter_cluster_rename(
6543    scx: &mut StatementContext,
6544    object_type: ObjectType,
6545    name: Ident,
6546    to_name: Ident,
6547    if_exists: bool,
6548) -> Result<Plan, PlanError> {
6549    match resolve_cluster(scx, &name, if_exists)? {
6550        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6551            id: entry.id(),
6552            name: entry.name().to_string(),
6553            to_name: ident(to_name),
6554        })),
6555        None => {
6556            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6557                name: name.to_ast_string_simple(),
6558                object_type,
6559            });
6560
6561            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6562        }
6563    }
6564}
6565
6566pub fn plan_alter_cluster_swap<F>(
6567    scx: &mut StatementContext,
6568    name_a: Ident,
6569    name_b: Ident,
6570    gen_temp_suffix: F,
6571) -> Result<Plan, PlanError>
6572where
6573    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6574{
6575    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6576    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6577
6578    let check = |temp_suffix: &str| {
6579        let mut temp_name = ident!("mz_schema_swap_");
6580        temp_name.append_lossy(temp_suffix);
6581        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6582            // Temp name does not exist, so we can use it.
6583            Err(CatalogError::UnknownCluster(_)) => true,
6584            // Temp name already exists!
6585            Ok(_) | Err(_) => false,
6586        }
6587    };
6588    let temp_suffix = gen_temp_suffix(&check)?;
6589    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6590
6591    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6592        id_a: cluster_a.id(),
6593        id_b: cluster_b.id(),
6594        name_a: name_a.into_string(),
6595        name_b: name_b.into_string(),
6596        name_temp,
6597    }))
6598}
6599
6600pub fn plan_alter_cluster_replica_rename(
6601    scx: &mut StatementContext,
6602    object_type: ObjectType,
6603    name: QualifiedReplica,
6604    to_item_name: Ident,
6605    if_exists: bool,
6606) -> Result<Plan, PlanError> {
6607    match resolve_cluster_replica(scx, &name, if_exists)? {
6608        Some((cluster, replica)) => {
6609            ensure_cluster_is_not_managed(scx, cluster.id())?;
6610            Ok(Plan::AlterClusterReplicaRename(
6611                AlterClusterReplicaRenamePlan {
6612                    cluster_id: cluster.id(),
6613                    replica_id: replica,
6614                    name: QualifiedReplica {
6615                        cluster: Ident::new(cluster.name())?,
6616                        replica: name.replica,
6617                    },
6618                    to_name: normalize::ident(to_item_name),
6619                },
6620            ))
6621        }
6622        None => {
6623            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6624                name: name.to_ast_string_simple(),
6625                object_type,
6626            });
6627
6628            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6629        }
6630    }
6631}
6632
6633pub fn describe_alter_object_swap(
6634    _: &StatementContext,
6635    _: AlterObjectSwapStatement,
6636) -> Result<StatementDesc, PlanError> {
6637    Ok(StatementDesc::new(None))
6638}
6639
6640pub fn plan_alter_object_swap(
6641    scx: &mut StatementContext,
6642    stmt: AlterObjectSwapStatement,
6643) -> Result<Plan, PlanError> {
6644    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6645
6646    let AlterObjectSwapStatement {
6647        object_type,
6648        name_a,
6649        name_b,
6650    } = stmt;
6651    let object_type = object_type.into();
6652
6653    // We'll try 10 times to generate a temporary suffix.
6654    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6655        let mut attempts = 0;
6656        let name_temp = loop {
6657            attempts += 1;
6658            if attempts > 10 {
6659                tracing::warn!("Unable to generate temp id for swapping");
6660                sql_bail!("unable to swap!");
6661            }
6662
6663            // Call the provided closure to make sure this name is unique!
6664            let short_id = mz_ore::id_gen::temp_id();
6665            if check_fn(&short_id) {
6666                break short_id;
6667            }
6668        };
6669
6670        Ok(name_temp)
6671    };
6672
6673    match (object_type, name_a, name_b) {
6674        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6675            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6676        }
6677        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6678            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6679        }
6680        (object_type, _, _) => Err(PlanError::Unsupported {
6681            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6682            discussion_no: None,
6683        }),
6684    }
6685}
6686
6687pub fn describe_alter_retain_history(
6688    _: &StatementContext,
6689    _: AlterRetainHistoryStatement<Aug>,
6690) -> Result<StatementDesc, PlanError> {
6691    Ok(StatementDesc::new(None))
6692}
6693
6694pub fn plan_alter_retain_history(
6695    scx: &StatementContext,
6696    AlterRetainHistoryStatement {
6697        object_type,
6698        if_exists,
6699        name,
6700        history,
6701    }: AlterRetainHistoryStatement<Aug>,
6702) -> Result<Plan, PlanError> {
6703    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6704}
6705
6706fn alter_retain_history(
6707    scx: &StatementContext,
6708    object_type: ObjectType,
6709    if_exists: bool,
6710    name: UnresolvedObjectName,
6711    history: Option<WithOptionValue<Aug>>,
6712) -> Result<Plan, PlanError> {
6713    let name = match (object_type, name) {
6714        (
6715            // View gets a special error below.
6716            ObjectType::View
6717            | ObjectType::MaterializedView
6718            | ObjectType::Table
6719            | ObjectType::Source
6720            | ObjectType::Index,
6721            UnresolvedObjectName::Item(name),
6722        ) => name,
6723        (object_type, _) => {
6724            sql_bail!("{object_type} does not support RETAIN HISTORY")
6725        }
6726    };
6727    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6728        Some(entry) => {
6729            let full_name = scx.catalog.resolve_full_name(entry.name());
6730            let item_type = entry.item_type();
6731
6732            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6733            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6734                return Err(PlanError::AlterViewOnMaterializedView(
6735                    full_name.to_string(),
6736                ));
6737            } else if object_type == ObjectType::View {
6738                sql_bail!("{object_type} does not support RETAIN HISTORY")
6739            } else if object_type != item_type {
6740                sql_bail!(
6741                    "\"{}\" is a {} not a {}",
6742                    full_name,
6743                    entry.item_type(),
6744                    format!("{object_type}").to_lowercase()
6745                )
6746            }
6747
6748            // Save the original value so we can write it back down in the create_sql catalog item.
6749            let (value, lcw) = match &history {
6750                Some(WithOptionValue::RetainHistoryFor(value)) => {
6751                    let window = OptionalDuration::try_from_value(value.clone())?;
6752                    (Some(value.clone()), window.0)
6753                }
6754                // None is RESET, so use the default CW.
6755                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6756                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6757            };
6758            let window = plan_retain_history(scx, lcw)?;
6759
6760            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6761                id: entry.id(),
6762                value,
6763                window,
6764                object_type,
6765            }))
6766        }
6767        None => {
6768            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6769                name: name.to_ast_string_simple(),
6770                object_type,
6771            });
6772
6773            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6774        }
6775    }
6776}
6777
6778pub fn describe_alter_secret_options(
6779    _: &StatementContext,
6780    _: AlterSecretStatement<Aug>,
6781) -> Result<StatementDesc, PlanError> {
6782    Ok(StatementDesc::new(None))
6783}
6784
6785pub fn plan_alter_secret(
6786    scx: &mut StatementContext,
6787    stmt: AlterSecretStatement<Aug>,
6788) -> Result<Plan, PlanError> {
6789    let AlterSecretStatement {
6790        name,
6791        if_exists,
6792        value,
6793    } = stmt;
6794    let object_type = ObjectType::Secret;
6795    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6796        Some(entry) => entry.id(),
6797        None => {
6798            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6799                name: name.to_string(),
6800                object_type,
6801            });
6802
6803            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6804        }
6805    };
6806
6807    let secret_as = query::plan_secret_as(scx, value)?;
6808
6809    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6810}
6811
6812pub fn describe_alter_connection(
6813    _: &StatementContext,
6814    _: AlterConnectionStatement<Aug>,
6815) -> Result<StatementDesc, PlanError> {
6816    Ok(StatementDesc::new(None))
6817}
6818
6819generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6820
6821pub fn plan_alter_connection(
6822    scx: &StatementContext,
6823    stmt: AlterConnectionStatement<Aug>,
6824) -> Result<Plan, PlanError> {
6825    let AlterConnectionStatement {
6826        name,
6827        if_exists,
6828        actions,
6829        with_options,
6830    } = stmt;
6831    let conn_name = normalize::unresolved_item_name(name)?;
6832    let entry = match scx.catalog.resolve_item(&conn_name) {
6833        Ok(entry) => entry,
6834        Err(_) if if_exists => {
6835            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6836                name: conn_name.to_string(),
6837                object_type: ObjectType::Sink,
6838            });
6839
6840            return Ok(Plan::AlterNoop(AlterNoopPlan {
6841                object_type: ObjectType::Connection,
6842            }));
6843        }
6844        Err(e) => return Err(e.into()),
6845    };
6846
6847    let connection = entry.connection()?;
6848
6849    if actions
6850        .iter()
6851        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6852    {
6853        if actions.len() > 1 {
6854            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6855        }
6856
6857        if !with_options.is_empty() {
6858            sql_bail!(
6859                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6860                with_options
6861                    .iter()
6862                    .map(|o| o.to_ast_string_simple())
6863                    .join(", ")
6864            );
6865        }
6866
6867        if !matches!(connection, Connection::Ssh(_)) {
6868            sql_bail!(
6869                "{} is not an SSH connection",
6870                scx.catalog.resolve_full_name(entry.name())
6871            )
6872        }
6873
6874        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6875            id: entry.id(),
6876            action: crate::plan::AlterConnectionAction::RotateKeys,
6877        }));
6878    }
6879
6880    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6881    if options.validate.is_some() {
6882        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6883    }
6884
6885    let validate = match options.validate {
6886        Some(val) => val,
6887        None => {
6888            scx.catalog
6889                .system_vars()
6890                .enable_default_connection_validation()
6891                && connection.validate_by_default()
6892        }
6893    };
6894
6895    let connection_type = match connection {
6896        Connection::Aws(_) => CreateConnectionType::Aws,
6897        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6898        Connection::Kafka(_) => CreateConnectionType::Kafka,
6899        Connection::Csr(_) => CreateConnectionType::Csr,
6900        Connection::Postgres(_) => CreateConnectionType::Postgres,
6901        Connection::Ssh(_) => CreateConnectionType::Ssh,
6902        Connection::MySql(_) => CreateConnectionType::MySql,
6903        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6904        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6905    };
6906
6907    // Collect all options irrespective of action taken on them.
6908    let specified_options: BTreeSet<_> = actions
6909        .iter()
6910        .map(|action: &AlterConnectionAction<Aug>| match action {
6911            AlterConnectionAction::SetOption(option) => option.name.clone(),
6912            AlterConnectionAction::DropOption(name) => name.clone(),
6913            AlterConnectionAction::RotateKeys => unreachable!(),
6914        })
6915        .collect();
6916
6917    for invalid in INALTERABLE_OPTIONS {
6918        if specified_options.contains(invalid) {
6919            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6920        }
6921    }
6922
6923    connection::validate_options_per_connection_type(connection_type, specified_options)?;
6924
6925    // Partition operations into set and drop
6926    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6927        actions.into_iter().partition_map(|action| match action {
6928            AlterConnectionAction::SetOption(option) => Either::Left(option),
6929            AlterConnectionAction::DropOption(name) => Either::Right(name),
6930            AlterConnectionAction::RotateKeys => unreachable!(),
6931        });
6932
6933    let set_options: BTreeMap<_, _> = set_options_vec
6934        .clone()
6935        .into_iter()
6936        .map(|option| (option.name, option.value))
6937        .collect();
6938
6939    // Type check values + avoid duplicates; we don't want to e.g. let users
6940    // drop and set the same option in the same statement, so treating drops as
6941    // sets here is fine.
6942    let connection_options_extracted =
6943        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6944
6945    let duplicates: Vec<_> = connection_options_extracted
6946        .seen
6947        .intersection(&drop_options)
6948        .collect();
6949
6950    if !duplicates.is_empty() {
6951        sql_bail!(
6952            "cannot both SET and DROP/RESET options {}",
6953            duplicates
6954                .iter()
6955                .map(|option| option.to_string())
6956                .join(", ")
6957        )
6958    }
6959
6960    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6961        let set_options_count = mutually_exclusive_options
6962            .iter()
6963            .filter(|o| set_options.contains_key(o))
6964            .count();
6965        let drop_options_count = mutually_exclusive_options
6966            .iter()
6967            .filter(|o| drop_options.contains(o))
6968            .count();
6969
6970        // Disallow setting _and_ resetting mutually exclusive options
6971        if set_options_count > 0 && drop_options_count > 0 {
6972            sql_bail!(
6973                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6974                connection_type,
6975                mutually_exclusive_options
6976                    .iter()
6977                    .map(|option| option.to_string())
6978                    .join(", ")
6979            )
6980        }
6981
6982        // If any option is either set or dropped, ensure all mutually exclusive
6983        // options are dropped. We do this "behind the scenes", even though we
6984        // disallow users from performing the same action because this is the
6985        // mechanism by which we overwrite values elsewhere in the code.
6986        if set_options_count > 0 || drop_options_count > 0 {
6987            drop_options.extend(mutually_exclusive_options.iter().cloned());
6988        }
6989
6990        // n.b. if mutually exclusive options are set, those will error when we
6991        // try to replan the connection.
6992    }
6993
6994    Ok(Plan::AlterConnection(AlterConnectionPlan {
6995        id: entry.id(),
6996        action: crate::plan::AlterConnectionAction::AlterOptions {
6997            set_options,
6998            drop_options,
6999            validate,
7000        },
7001    }))
7002}
7003
7004pub fn describe_alter_sink(
7005    _: &StatementContext,
7006    _: AlterSinkStatement<Aug>,
7007) -> Result<StatementDesc, PlanError> {
7008    Ok(StatementDesc::new(None))
7009}
7010
7011pub fn plan_alter_sink(
7012    scx: &mut StatementContext,
7013    stmt: AlterSinkStatement<Aug>,
7014) -> Result<Plan, PlanError> {
7015    let AlterSinkStatement {
7016        sink_name,
7017        if_exists,
7018        action,
7019    } = stmt;
7020
7021    let object_type = ObjectType::Sink;
7022    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7023
7024    let Some(item) = item else {
7025        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7026            name: sink_name.to_string(),
7027            object_type,
7028        });
7029
7030        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7031    };
7032    // Always ALTER objects from their latest version.
7033    let item = item.at_version(RelationVersionSelector::Latest);
7034
7035    match action {
7036        AlterSinkAction::ChangeRelation(new_from) => {
7037            // First we reconstruct the original CREATE SINK statement
7038            let create_sql = item.create_sql();
7039            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7040            let [stmt]: [StatementParseResult; 1] = stmts
7041                .try_into()
7042                .expect("create sql of sink was not exactly one statement");
7043            let Statement::CreateSink(stmt) = stmt.ast else {
7044                unreachable!("invalid create SQL for sink item");
7045            };
7046
7047            // Then resolve and swap the resolved from relation to the new one
7048            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7049            stmt.from = new_from;
7050
7051            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7052            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7053                unreachable!("invalid plan for CREATE SINK statement");
7054            };
7055
7056            plan.sink.version += 1;
7057
7058            Ok(Plan::AlterSink(AlterSinkPlan {
7059                item_id: item.id(),
7060                global_id: item.global_id(),
7061                sink: plan.sink,
7062                with_snapshot: plan.with_snapshot,
7063                in_cluster: plan.in_cluster,
7064            }))
7065        }
7066        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7067        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7068    }
7069}
7070
7071pub fn describe_alter_source(
7072    _: &StatementContext,
7073    _: AlterSourceStatement<Aug>,
7074) -> Result<StatementDesc, PlanError> {
7075    // TODO: put the options here, right?
7076    Ok(StatementDesc::new(None))
7077}
7078
7079generate_extracted_config!(
7080    AlterSourceAddSubsourceOption,
7081    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7082    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7083    (Details, String)
7084);
7085
7086pub fn plan_alter_source(
7087    scx: &mut StatementContext,
7088    stmt: AlterSourceStatement<Aug>,
7089) -> Result<Plan, PlanError> {
7090    let AlterSourceStatement {
7091        source_name,
7092        if_exists,
7093        action,
7094    } = stmt;
7095    let object_type = ObjectType::Source;
7096
7097    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7098        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7099            name: source_name.to_string(),
7100            object_type,
7101        });
7102
7103        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7104    }
7105
7106    match action {
7107        AlterSourceAction::SetOptions(options) => {
7108            let mut options = options.into_iter();
7109            let option = options.next().unwrap();
7110            if option.name == CreateSourceOptionName::RetainHistory {
7111                if options.next().is_some() {
7112                    sql_bail!("RETAIN HISTORY must be only option");
7113                }
7114                return alter_retain_history(
7115                    scx,
7116                    object_type,
7117                    if_exists,
7118                    UnresolvedObjectName::Item(source_name),
7119                    option.value,
7120                );
7121            }
7122            // n.b we use this statement in purification in a way that cannot be
7123            // planned directly.
7124            sql_bail!(
7125                "Cannot modify the {} of a SOURCE.",
7126                option.name.to_ast_string_simple()
7127            );
7128        }
7129        AlterSourceAction::ResetOptions(reset) => {
7130            let mut options = reset.into_iter();
7131            let option = options.next().unwrap();
7132            if option == CreateSourceOptionName::RetainHistory {
7133                if options.next().is_some() {
7134                    sql_bail!("RETAIN HISTORY must be only option");
7135                }
7136                return alter_retain_history(
7137                    scx,
7138                    object_type,
7139                    if_exists,
7140                    UnresolvedObjectName::Item(source_name),
7141                    None,
7142                );
7143            }
7144            sql_bail!(
7145                "Cannot modify the {} of a SOURCE.",
7146                option.to_ast_string_simple()
7147            );
7148        }
7149        AlterSourceAction::DropSubsources { .. } => {
7150            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7151        }
7152        AlterSourceAction::AddSubsources { .. } => {
7153            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7154        }
7155        AlterSourceAction::RefreshReferences => {
7156            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7157        }
7158    };
7159}
7160
7161pub fn describe_alter_system_set(
7162    _: &StatementContext,
7163    _: AlterSystemSetStatement,
7164) -> Result<StatementDesc, PlanError> {
7165    Ok(StatementDesc::new(None))
7166}
7167
7168pub fn plan_alter_system_set(
7169    _: &StatementContext,
7170    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7171) -> Result<Plan, PlanError> {
7172    let name = name.to_string();
7173    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7174        name,
7175        value: scl::plan_set_variable_to(to)?,
7176    }))
7177}
7178
7179pub fn describe_alter_system_reset(
7180    _: &StatementContext,
7181    _: AlterSystemResetStatement,
7182) -> Result<StatementDesc, PlanError> {
7183    Ok(StatementDesc::new(None))
7184}
7185
7186pub fn plan_alter_system_reset(
7187    _: &StatementContext,
7188    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7189) -> Result<Plan, PlanError> {
7190    let name = name.to_string();
7191    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7192}
7193
7194pub fn describe_alter_system_reset_all(
7195    _: &StatementContext,
7196    _: AlterSystemResetAllStatement,
7197) -> Result<StatementDesc, PlanError> {
7198    Ok(StatementDesc::new(None))
7199}
7200
7201pub fn plan_alter_system_reset_all(
7202    _: &StatementContext,
7203    _: AlterSystemResetAllStatement,
7204) -> Result<Plan, PlanError> {
7205    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7206}
7207
7208pub fn describe_alter_role(
7209    _: &StatementContext,
7210    _: AlterRoleStatement<Aug>,
7211) -> Result<StatementDesc, PlanError> {
7212    Ok(StatementDesc::new(None))
7213}
7214
7215pub fn plan_alter_role(
7216    _scx: &StatementContext,
7217    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7218) -> Result<Plan, PlanError> {
7219    let option = match option {
7220        AlterRoleOption::Attributes(attrs) => {
7221            let attrs = plan_role_attributes(attrs)?;
7222            PlannedAlterRoleOption::Attributes(attrs)
7223        }
7224        AlterRoleOption::Variable(variable) => {
7225            let var = plan_role_variable(variable)?;
7226            PlannedAlterRoleOption::Variable(var)
7227        }
7228    };
7229
7230    Ok(Plan::AlterRole(AlterRolePlan {
7231        id: name.id,
7232        name: name.name,
7233        option,
7234    }))
7235}
7236
7237pub fn describe_alter_table_add_column(
7238    _: &StatementContext,
7239    _: AlterTableAddColumnStatement<Aug>,
7240) -> Result<StatementDesc, PlanError> {
7241    Ok(StatementDesc::new(None))
7242}
7243
7244pub fn plan_alter_table_add_column(
7245    scx: &StatementContext,
7246    stmt: AlterTableAddColumnStatement<Aug>,
7247) -> Result<Plan, PlanError> {
7248    let AlterTableAddColumnStatement {
7249        if_exists,
7250        name,
7251        if_col_not_exist,
7252        column_name,
7253        data_type,
7254    } = stmt;
7255    let object_type = ObjectType::Table;
7256
7257    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7258
7259    let (relation_id, item_name, desc) =
7260        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7261            Some(item) => {
7262                // Always add columns to the latest version of the item.
7263                let item_name = scx.catalog.resolve_full_name(item.name());
7264                let item = item.at_version(RelationVersionSelector::Latest);
7265                let desc = item.desc(&item_name)?.into_owned();
7266                (item.id(), item_name, desc)
7267            }
7268            None => {
7269                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7270                    name: name.to_ast_string_simple(),
7271                    object_type,
7272                });
7273                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7274            }
7275        };
7276
7277    let column_name = ColumnName::from(column_name.as_str());
7278    if desc.get_by_name(&column_name).is_some() {
7279        if if_col_not_exist {
7280            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7281                column_name: column_name.to_string(),
7282                object_name: item_name.item,
7283            });
7284            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7285        } else {
7286            return Err(PlanError::ColumnAlreadyExists {
7287                column_name,
7288                object_name: item_name.item,
7289            });
7290        }
7291    }
7292
7293    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7294    // TODO(alter_table): Support non-nullable columns with default values.
7295    let column_type = scalar_type.nullable(true);
7296    // "unresolve" our data type so we can later update the persisted create_sql.
7297    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7298
7299    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7300        relation_id,
7301        column_name,
7302        column_type,
7303        raw_sql_type,
7304    }))
7305}
7306
7307pub fn describe_comment(
7308    _: &StatementContext,
7309    _: CommentStatement<Aug>,
7310) -> Result<StatementDesc, PlanError> {
7311    Ok(StatementDesc::new(None))
7312}
7313
7314pub fn plan_comment(
7315    scx: &mut StatementContext,
7316    stmt: CommentStatement<Aug>,
7317) -> Result<Plan, PlanError> {
7318    const MAX_COMMENT_LENGTH: usize = 1024;
7319
7320    let CommentStatement { object, comment } = stmt;
7321
7322    // TODO(parkmycar): Make max comment length configurable.
7323    if let Some(c) = &comment {
7324        if c.len() > 1024 {
7325            return Err(PlanError::CommentTooLong {
7326                length: c.len(),
7327                max_size: MAX_COMMENT_LENGTH,
7328            });
7329        }
7330    }
7331
7332    let (object_id, column_pos) = match &object {
7333        com_ty @ CommentObjectType::Table { name }
7334        | com_ty @ CommentObjectType::View { name }
7335        | com_ty @ CommentObjectType::MaterializedView { name }
7336        | com_ty @ CommentObjectType::Index { name }
7337        | com_ty @ CommentObjectType::Func { name }
7338        | com_ty @ CommentObjectType::Connection { name }
7339        | com_ty @ CommentObjectType::Source { name }
7340        | com_ty @ CommentObjectType::Sink { name }
7341        | com_ty @ CommentObjectType::Secret { name }
7342        | com_ty @ CommentObjectType::ContinualTask { name } => {
7343            let item = scx.get_item_by_resolved_name(name)?;
7344            match (com_ty, item.item_type()) {
7345                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7346                    (CommentObjectId::Table(item.id()), None)
7347                }
7348                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7349                    (CommentObjectId::View(item.id()), None)
7350                }
7351                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7352                    (CommentObjectId::MaterializedView(item.id()), None)
7353                }
7354                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7355                    (CommentObjectId::Index(item.id()), None)
7356                }
7357                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7358                    (CommentObjectId::Func(item.id()), None)
7359                }
7360                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7361                    (CommentObjectId::Connection(item.id()), None)
7362                }
7363                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7364                    (CommentObjectId::Source(item.id()), None)
7365                }
7366                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7367                    (CommentObjectId::Sink(item.id()), None)
7368                }
7369                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7370                    (CommentObjectId::Secret(item.id()), None)
7371                }
7372                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7373                    (CommentObjectId::ContinualTask(item.id()), None)
7374                }
7375                (com_ty, cat_ty) => {
7376                    let expected_type = match com_ty {
7377                        CommentObjectType::Table { .. } => ObjectType::Table,
7378                        CommentObjectType::View { .. } => ObjectType::View,
7379                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7380                        CommentObjectType::Index { .. } => ObjectType::Index,
7381                        CommentObjectType::Func { .. } => ObjectType::Func,
7382                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7383                        CommentObjectType::Source { .. } => ObjectType::Source,
7384                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7385                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7386                        _ => unreachable!("these are the only types we match on"),
7387                    };
7388
7389                    return Err(PlanError::InvalidObjectType {
7390                        expected_type: SystemObjectType::Object(expected_type),
7391                        actual_type: SystemObjectType::Object(cat_ty.into()),
7392                        object_name: item.name().item.clone(),
7393                    });
7394                }
7395            }
7396        }
7397        CommentObjectType::Type { ty } => match ty {
7398            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7399                sql_bail!("cannot comment on anonymous list or map type");
7400            }
7401            ResolvedDataType::Named { id, modifiers, .. } => {
7402                if !modifiers.is_empty() {
7403                    sql_bail!("cannot comment on type with modifiers");
7404                }
7405                (CommentObjectId::Type(*id), None)
7406            }
7407            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7408        },
7409        CommentObjectType::Column { name } => {
7410            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7411            match item.item_type() {
7412                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7413                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7414                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7415                CatalogItemType::MaterializedView => {
7416                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7417                }
7418                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7419                r => {
7420                    return Err(PlanError::Unsupported {
7421                        feature: format!("Specifying comments on a column of {r}"),
7422                        discussion_no: None,
7423                    });
7424                }
7425            }
7426        }
7427        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7428        CommentObjectType::Database { name } => {
7429            (CommentObjectId::Database(*name.database_id()), None)
7430        }
7431        CommentObjectType::Schema { name } => (
7432            CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7433            None,
7434        ),
7435        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7436        CommentObjectType::ClusterReplica { name } => {
7437            let replica = scx.catalog.resolve_cluster_replica(name)?;
7438            (
7439                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7440                None,
7441            )
7442        }
7443        CommentObjectType::NetworkPolicy { name } => {
7444            (CommentObjectId::NetworkPolicy(name.id), None)
7445        }
7446    };
7447
7448    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7449    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7450    // it's the easiest place to raise an error.
7451    //
7452    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7453    if let Some(p) = column_pos {
7454        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7455            max_num_columns: MAX_NUM_COLUMNS,
7456            req_num_columns: p,
7457        })?;
7458    }
7459
7460    Ok(Plan::Comment(CommentPlan {
7461        object_id,
7462        sub_component: column_pos,
7463        comment,
7464    }))
7465}
7466
7467pub(crate) fn resolve_cluster<'a>(
7468    scx: &'a StatementContext,
7469    name: &'a Ident,
7470    if_exists: bool,
7471) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7472    match scx.resolve_cluster(Some(name)) {
7473        Ok(cluster) => Ok(Some(cluster)),
7474        Err(_) if if_exists => Ok(None),
7475        Err(e) => Err(e),
7476    }
7477}
7478
7479pub(crate) fn resolve_cluster_replica<'a>(
7480    scx: &'a StatementContext,
7481    name: &QualifiedReplica,
7482    if_exists: bool,
7483) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7484    match scx.resolve_cluster(Some(&name.cluster)) {
7485        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7486            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7487            None if if_exists => Ok(None),
7488            None => Err(sql_err!(
7489                "CLUSTER {} has no CLUSTER REPLICA named {}",
7490                cluster.name(),
7491                name.replica.as_str().quoted(),
7492            )),
7493        },
7494        Err(_) if if_exists => Ok(None),
7495        Err(e) => Err(e),
7496    }
7497}
7498
7499pub(crate) fn resolve_database<'a>(
7500    scx: &'a StatementContext,
7501    name: &'a UnresolvedDatabaseName,
7502    if_exists: bool,
7503) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7504    match scx.resolve_database(name) {
7505        Ok(database) => Ok(Some(database)),
7506        Err(_) if if_exists => Ok(None),
7507        Err(e) => Err(e),
7508    }
7509}
7510
7511pub(crate) fn resolve_schema<'a>(
7512    scx: &'a StatementContext,
7513    name: UnresolvedSchemaName,
7514    if_exists: bool,
7515) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7516    match scx.resolve_schema(name) {
7517        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7518        Err(_) if if_exists => Ok(None),
7519        Err(e) => Err(e),
7520    }
7521}
7522
7523pub(crate) fn resolve_network_policy<'a>(
7524    scx: &'a StatementContext,
7525    name: Ident,
7526    if_exists: bool,
7527) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7528    match scx.catalog.resolve_network_policy(&name.to_string()) {
7529        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7530            id: policy.id(),
7531            name: policy.name().to_string(),
7532        })),
7533        Err(_) if if_exists => Ok(None),
7534        Err(e) => Err(e.into()),
7535    }
7536}
7537
7538pub(crate) fn resolve_item_or_type<'a>(
7539    scx: &'a StatementContext,
7540    object_type: ObjectType,
7541    name: UnresolvedItemName,
7542    if_exists: bool,
7543) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7544    let name = normalize::unresolved_item_name(name)?;
7545    let catalog_item = match object_type {
7546        ObjectType::Type => scx.catalog.resolve_type(&name),
7547        _ => scx.catalog.resolve_item(&name),
7548    };
7549
7550    match catalog_item {
7551        Ok(item) => {
7552            let is_type = ObjectType::from(item.item_type());
7553            if object_type == is_type {
7554                Ok(Some(item))
7555            } else {
7556                Err(PlanError::MismatchedObjectType {
7557                    name: scx.catalog.minimal_qualification(item.name()),
7558                    is_type,
7559                    expected_type: object_type,
7560                })
7561            }
7562        }
7563        Err(_) if if_exists => Ok(None),
7564        Err(e) => Err(e.into()),
7565    }
7566}
7567
7568/// Returns an error if the given cluster is a managed cluster
7569fn ensure_cluster_is_not_managed(
7570    scx: &StatementContext,
7571    cluster_id: ClusterId,
7572) -> Result<(), PlanError> {
7573    let cluster = scx.catalog.get_cluster(cluster_id);
7574    if cluster.is_managed() {
7575        Err(PlanError::ManagedCluster {
7576            cluster_name: cluster.name().to_string(),
7577        })
7578    } else {
7579        Ok(())
7580    }
7581}