mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_postgres_util::tunnel::PostgresFlavor;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, ColumnType, RelationDesc, RelationType, RelationVersion,
42    RelationVersionSelector, ScalarType, Timestamp, VersionedRelationDesc, preserves_order,
43    strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
49    AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
50    AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
51    AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
52    AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
53    AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
54    ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
55    ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
56    ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
57    ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
58    ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
59    CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
60    CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
61    CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
62    CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
63    CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
64    CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
65    CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
66    CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
67    CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
68    CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
69    CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
70    CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
71    DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
72    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
73    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
74    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
75    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
76    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
77    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
78    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
79    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
80    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
81    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
82    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89    KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
90    StorageSinkConnection,
91};
92use mz_storage_types::sources::encoding::{
93    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94    SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104    LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110    PostgresSourceConnection, PostgresSourcePublicationDetails,
111    ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119    SourceExportDetails, SourceExportStatementDetails, SqlServerSource, SqlServerSourceExtras,
120    Timeline,
121};
122use prost::Message;
123
124use crate::ast::display::AstDisplay;
125use crate::catalog::{
126    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
127    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
128};
129use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
130use crate::names::{
131    Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
132    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
133    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
134};
135use crate::normalize::{self, ident};
136use crate::plan::error::PlanError;
137use crate::plan::query::{
138    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
139    scalar_type_from_sql,
140};
141use crate::plan::scope::Scope;
142use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
143use crate::plan::statement::{StatementContext, StatementDesc, scl};
144use crate::plan::typeconv::CastContext;
145use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
146use crate::plan::{
147    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
148    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
149    AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
150    AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
151    AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
152    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
153    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
154    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
155    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
156    CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
157    CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
158    CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
159    Ingestion, MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction,
160    NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice, PolicyAddress, QueryContext,
161    ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View,
162    WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal,
163    plan_utils, query, transform_ast,
164};
165use crate::session::vars::{
166    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
167    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
168};
169use crate::{names, parse};
170
171mod connection;
172
173// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
174//
175// The real max is probably higher than this, but it's easier to relax a constraint than make it
176// more strict.
177const MAX_NUM_COLUMNS: usize = 256;
178
179static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
180    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
181
182/// Given a relation desc and a column list, checks that:
183/// - the column list is a prefix of the desc;
184/// - all the listed columns are types that have meaningful Persist-level ordering.
185fn check_partition_by(desc: &RelationDesc, partition_by: Vec<Ident>) -> Result<(), PlanError> {
186    for (idx, ((desc_name, desc_type), partition_name)) in
187        desc.iter().zip(partition_by.into_iter()).enumerate()
188    {
189        let partition_name = normalize::column_name(partition_name);
190        if *desc_name != partition_name {
191            sql_bail!(
192                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
193            );
194        }
195        if !preserves_order(&desc_type.scalar_type) {
196            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
197        }
198    }
199    Ok(())
200}
201
202pub fn describe_create_database(
203    _: &StatementContext,
204    _: CreateDatabaseStatement,
205) -> Result<StatementDesc, PlanError> {
206    Ok(StatementDesc::new(None))
207}
208
209pub fn plan_create_database(
210    _: &StatementContext,
211    CreateDatabaseStatement {
212        name,
213        if_not_exists,
214    }: CreateDatabaseStatement,
215) -> Result<Plan, PlanError> {
216    Ok(Plan::CreateDatabase(CreateDatabasePlan {
217        name: normalize::ident(name.0),
218        if_not_exists,
219    }))
220}
221
222pub fn describe_create_schema(
223    _: &StatementContext,
224    _: CreateSchemaStatement,
225) -> Result<StatementDesc, PlanError> {
226    Ok(StatementDesc::new(None))
227}
228
229pub fn plan_create_schema(
230    scx: &StatementContext,
231    CreateSchemaStatement {
232        mut name,
233        if_not_exists,
234    }: CreateSchemaStatement,
235) -> Result<Plan, PlanError> {
236    if name.0.len() > 2 {
237        sql_bail!("schema name {} has more than two components", name);
238    }
239    let schema_name = normalize::ident(
240        name.0
241            .pop()
242            .expect("names always have at least one component"),
243    );
244    let database_spec = match name.0.pop() {
245        None => match scx.catalog.active_database() {
246            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
247            None => sql_bail!("no database specified and no active database"),
248        },
249        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
250            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
251            Err(_) => sql_bail!("invalid database {}", n.as_str()),
252        },
253    };
254    Ok(Plan::CreateSchema(CreateSchemaPlan {
255        database_spec,
256        schema_name,
257        if_not_exists,
258    }))
259}
260
261pub fn describe_create_table(
262    _: &StatementContext,
263    _: CreateTableStatement<Aug>,
264) -> Result<StatementDesc, PlanError> {
265    Ok(StatementDesc::new(None))
266}
267
268pub fn plan_create_table(
269    scx: &StatementContext,
270    stmt: CreateTableStatement<Aug>,
271) -> Result<Plan, PlanError> {
272    let CreateTableStatement {
273        name,
274        columns,
275        constraints,
276        if_not_exists,
277        temporary,
278        with_options,
279    } = &stmt;
280
281    let names: Vec<_> = columns
282        .iter()
283        .filter(|c| {
284            // This set of `names` is used to create the initial RelationDesc.
285            // Columns that have been added at later versions of the table will
286            // get added further below.
287            let is_versioned = c
288                .options
289                .iter()
290                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
291            !is_versioned
292        })
293        .map(|c| normalize::column_name(c.name.clone()))
294        .collect();
295
296    if let Some(dup) = names.iter().duplicates().next() {
297        sql_bail!("column {} specified more than once", dup.quoted());
298    }
299
300    // Build initial relation type that handles declared data types
301    // and NOT NULL constraints.
302    let mut column_types = Vec::with_capacity(columns.len());
303    let mut defaults = Vec::with_capacity(columns.len());
304    let mut changes = BTreeMap::new();
305    let mut keys = Vec::new();
306
307    for (i, c) in columns.into_iter().enumerate() {
308        let aug_data_type = &c.data_type;
309        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
310        let mut nullable = true;
311        let mut default = Expr::null();
312        let mut versioned = false;
313        for option in &c.options {
314            match &option.option {
315                ColumnOption::NotNull => nullable = false,
316                ColumnOption::Default(expr) => {
317                    // Ensure expression can be planned and yields the correct
318                    // type.
319                    let mut expr = expr.clone();
320                    transform_ast::transform(scx, &mut expr)?;
321                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
322                    default = expr.clone();
323                }
324                ColumnOption::Unique { is_primary } => {
325                    keys.push(vec![i]);
326                    if *is_primary {
327                        nullable = false;
328                    }
329                }
330                ColumnOption::Versioned { action, version } => {
331                    let version = RelationVersion::from(*version);
332                    versioned = true;
333
334                    let name = normalize::column_name(c.name.clone());
335                    let typ = ty.clone().nullable(nullable);
336
337                    changes.insert(version, (action.clone(), name, typ));
338                }
339                other => {
340                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
341                }
342            }
343        }
344        // TODO(alter_table): This assumes all versioned columns are at the
345        // end. This will no longer be true when we support dropping columns.
346        if !versioned {
347            column_types.push(ty.nullable(nullable));
348        }
349        defaults.push(default);
350    }
351
352    let mut seen_primary = false;
353    'c: for constraint in constraints {
354        match constraint {
355            TableConstraint::Unique {
356                name: _,
357                columns,
358                is_primary,
359                nulls_not_distinct,
360            } => {
361                if seen_primary && *is_primary {
362                    sql_bail!(
363                        "multiple primary keys for table {} are not allowed",
364                        name.to_ast_string_stable()
365                    );
366                }
367                seen_primary = *is_primary || seen_primary;
368
369                let mut key = vec![];
370                for column in columns {
371                    let column = normalize::column_name(column.clone());
372                    match names.iter().position(|name| *name == column) {
373                        None => sql_bail!("unknown column in constraint: {}", column),
374                        Some(i) => {
375                            let nullable = &mut column_types[i].nullable;
376                            if *is_primary {
377                                if *nulls_not_distinct {
378                                    sql_bail!(
379                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
380                                    );
381                                }
382
383                                *nullable = false;
384                            } else if !(*nulls_not_distinct || !*nullable) {
385                                // Non-primary key unique constraints are only keys if all of their
386                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
387                                break 'c;
388                            }
389
390                            key.push(i);
391                        }
392                    }
393                }
394
395                if *is_primary {
396                    keys.insert(0, key);
397                } else {
398                    keys.push(key);
399                }
400            }
401            TableConstraint::ForeignKey { .. } => {
402                // Foreign key constraints are not presently enforced. We allow
403                // them with feature flags for sqllogictest's sake.
404                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
405            }
406            TableConstraint::Check { .. } => {
407                // Check constraints are not presently enforced. We allow them
408                // with feature flags for sqllogictest's sake.
409                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
410            }
411        }
412    }
413
414    if !keys.is_empty() {
415        // Unique constraints are not presently enforced. We allow them with feature flags for
416        // sqllogictest's sake.
417        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
418    }
419
420    let typ = RelationType::new(column_types).with_keys(keys);
421
422    let temporary = *temporary;
423    let name = if temporary {
424        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
425    } else {
426        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
427    };
428
429    // Check for an object in the catalog with this same name
430    let full_name = scx.catalog.resolve_full_name(&name);
431    let partial_name = PartialItemName::from(full_name.clone());
432    // For PostgreSQL compatibility, we need to prevent creating tables when
433    // there is an existing object *or* type of the same name.
434    if let (false, Ok(item)) = (
435        if_not_exists,
436        scx.catalog.resolve_item_or_type(&partial_name),
437    ) {
438        return Err(PlanError::ItemAlreadyExists {
439            name: full_name.to_string(),
440            item_type: item.item_type(),
441        });
442    }
443
444    let desc = RelationDesc::new(typ, names);
445    let mut desc = VersionedRelationDesc::new(desc);
446    for (version, (_action, name, typ)) in changes.into_iter() {
447        let new_version = desc.add_column(name, typ);
448        if version != new_version {
449            return Err(PlanError::InvalidTable {
450                name: full_name.item,
451            });
452        }
453    }
454
455    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
456
457    // Table options should only consider the original columns, since those
458    // were the only ones in scope when the table was created.
459    //
460    // TODO(alter_table): Will need to reconsider this when we support ALTERing
461    // the PARTITION BY columns.
462    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
463    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
464
465    let compaction_window = options.iter().find_map(|o| {
466        #[allow(irrefutable_let_patterns)]
467        if let crate::plan::TableOption::RetainHistory(lcw) = o {
468            Some(lcw.clone())
469        } else {
470            None
471        }
472    });
473
474    let table = Table {
475        create_sql,
476        desc,
477        temporary,
478        compaction_window,
479        data_source: TableDataSource::TableWrites { defaults },
480    };
481    Ok(Plan::CreateTable(CreateTablePlan {
482        name,
483        table,
484        if_not_exists: *if_not_exists,
485    }))
486}
487
488pub fn describe_create_table_from_source(
489    _: &StatementContext,
490    _: CreateTableFromSourceStatement<Aug>,
491) -> Result<StatementDesc, PlanError> {
492    Ok(StatementDesc::new(None))
493}
494
495pub fn describe_create_webhook_source(
496    _: &StatementContext,
497    _: CreateWebhookSourceStatement<Aug>,
498) -> Result<StatementDesc, PlanError> {
499    Ok(StatementDesc::new(None))
500}
501
502pub fn describe_create_source(
503    _: &StatementContext,
504    _: CreateSourceStatement<Aug>,
505) -> Result<StatementDesc, PlanError> {
506    Ok(StatementDesc::new(None))
507}
508
509pub fn describe_create_subsource(
510    _: &StatementContext,
511    _: CreateSubsourceStatement<Aug>,
512) -> Result<StatementDesc, PlanError> {
513    Ok(StatementDesc::new(None))
514}
515
516generate_extracted_config!(
517    CreateSourceOption,
518    (TimestampInterval, Duration),
519    (RetainHistory, OptionalDuration)
520);
521
522generate_extracted_config!(
523    PgConfigOption,
524    (Details, String),
525    (Publication, String),
526    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
527);
528
529generate_extracted_config!(
530    MySqlConfigOption,
531    (Details, String),
532    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
533    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
534);
535
536generate_extracted_config!(
537    SqlServerConfigOption,
538    (Details, String),
539    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
540    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
541);
542
543pub fn plan_create_webhook_source(
544    scx: &StatementContext,
545    mut stmt: CreateWebhookSourceStatement<Aug>,
546) -> Result<Plan, PlanError> {
547    if stmt.is_table {
548        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
549    }
550
551    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
552    // we plan to normalize when we canonicalize the create statement.
553    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
554    let enable_multi_replica_sources =
555        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
556    if !enable_multi_replica_sources {
557        if in_cluster.replica_ids().len() > 1 {
558            sql_bail!("cannot create webhook source in cluster with more than one replica")
559        }
560    }
561    let create_sql =
562        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
563
564    let CreateWebhookSourceStatement {
565        name,
566        if_not_exists,
567        body_format,
568        include_headers,
569        validate_using,
570        is_table,
571        // We resolved `in_cluster` above, so we want to ignore it here.
572        in_cluster: _,
573    } = stmt;
574
575    let validate_using = validate_using
576        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
577        .transpose()?;
578    if let Some(WebhookValidation { expression, .. }) = &validate_using {
579        // If the validation expression doesn't reference any part of the request, then we should
580        // return an error because it's almost definitely wrong.
581        if !expression.contains_column() {
582            return Err(PlanError::WebhookValidationDoesNotUseColumns);
583        }
584        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
585        // allow calls to `now()` because some webhook providers recommend rejecting requests that
586        // are older than a certain threshold.
587        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
588            return Err(PlanError::WebhookValidationNonDeterministic);
589        }
590    }
591
592    let body_format = match body_format {
593        Format::Bytes => WebhookBodyFormat::Bytes,
594        Format::Json { array } => WebhookBodyFormat::Json { array },
595        Format::Text => WebhookBodyFormat::Text,
596        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
597        ty => {
598            return Err(PlanError::Unsupported {
599                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
600                discussion_no: None,
601            });
602        }
603    };
604
605    let mut column_ty = vec![
606        // Always include the body of the request as the first column.
607        ColumnType {
608            scalar_type: ScalarType::from(body_format),
609            nullable: false,
610        },
611    ];
612    let mut column_names = vec!["body".to_string()];
613
614    let mut headers = WebhookHeaders::default();
615
616    // Include a `headers` column, possibly filtered.
617    if let Some(filters) = include_headers.column {
618        column_ty.push(ColumnType {
619            scalar_type: ScalarType::Map {
620                value_type: Box::new(ScalarType::String),
621                custom_id: None,
622            },
623            nullable: false,
624        });
625        column_names.push("headers".to_string());
626
627        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
628            filters.into_iter().partition_map(|filter| {
629                if filter.block {
630                    itertools::Either::Right(filter.header_name)
631                } else {
632                    itertools::Either::Left(filter.header_name)
633                }
634            });
635        headers.header_column = Some(WebhookHeaderFilters { allow, block });
636    }
637
638    // Map headers to specific columns.
639    for header in include_headers.mappings {
640        let scalar_type = header
641            .use_bytes
642            .then_some(ScalarType::Bytes)
643            .unwrap_or(ScalarType::String);
644        column_ty.push(ColumnType {
645            scalar_type,
646            nullable: true,
647        });
648        column_names.push(header.column_name.into_string());
649
650        let column_idx = column_ty.len() - 1;
651        // Double check we're consistent with column names.
652        assert_eq!(
653            column_idx,
654            column_names.len() - 1,
655            "header column names and types don't match"
656        );
657        headers
658            .mapped_headers
659            .insert(column_idx, (header.header_name, header.use_bytes));
660    }
661
662    // Validate our columns.
663    let mut unique_check = HashSet::with_capacity(column_names.len());
664    for name in &column_names {
665        if !unique_check.insert(name) {
666            return Err(PlanError::AmbiguousColumn(name.clone().into()));
667        }
668    }
669    if column_names.len() > MAX_NUM_COLUMNS {
670        return Err(PlanError::TooManyColumns {
671            max_num_columns: MAX_NUM_COLUMNS,
672            req_num_columns: column_names.len(),
673        });
674    }
675
676    let typ = RelationType::new(column_ty);
677    let desc = RelationDesc::new(typ, column_names);
678
679    // Check for an object in the catalog with this same name
680    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
681    let full_name = scx.catalog.resolve_full_name(&name);
682    let partial_name = PartialItemName::from(full_name.clone());
683    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
684        return Err(PlanError::ItemAlreadyExists {
685            name: full_name.to_string(),
686            item_type: item.item_type(),
687        });
688    }
689
690    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
691    // such, we always use a default of EpochMilliseconds.
692    let timeline = Timeline::EpochMilliseconds;
693
694    let plan = if is_table {
695        let data_source = DataSourceDesc::Webhook {
696            validate_using,
697            body_format,
698            headers,
699            cluster_id: Some(in_cluster.id()),
700        };
701        let data_source = TableDataSource::DataSource {
702            desc: data_source,
703            timeline,
704        };
705        Plan::CreateTable(CreateTablePlan {
706            name,
707            if_not_exists,
708            table: Table {
709                create_sql,
710                desc: VersionedRelationDesc::new(desc),
711                temporary: false,
712                compaction_window: None,
713                data_source,
714            },
715        })
716    } else {
717        let data_source = DataSourceDesc::Webhook {
718            validate_using,
719            body_format,
720            headers,
721            // Important: The cluster is set at the `Source` level.
722            cluster_id: None,
723        };
724        Plan::CreateSource(CreateSourcePlan {
725            name,
726            source: Source {
727                create_sql,
728                data_source,
729                desc,
730                compaction_window: None,
731            },
732            if_not_exists,
733            timeline,
734            in_cluster: Some(in_cluster.id()),
735        })
736    };
737
738    Ok(plan)
739}
740
741pub fn plan_create_source(
742    scx: &StatementContext,
743    mut stmt: CreateSourceStatement<Aug>,
744) -> Result<Plan, PlanError> {
745    let CreateSourceStatement {
746        name,
747        in_cluster: _,
748        col_names,
749        connection: source_connection,
750        envelope,
751        if_not_exists,
752        format,
753        key_constraint,
754        include_metadata,
755        with_options,
756        external_references: referenced_subsources,
757        progress_subsource,
758    } = &stmt;
759
760    mz_ore::soft_assert_or_log!(
761        referenced_subsources.is_none(),
762        "referenced subsources must be cleared in purification"
763    );
764
765    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
766        && scx.catalog.system_vars().force_source_table_syntax();
767
768    // If the new source table syntax is forced all the options related to the primary
769    // source output should be un-set.
770    if force_source_table_syntax {
771        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
772            Err(PlanError::UseTablesForSources(
773                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
774            ))?;
775        }
776    }
777
778    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
779
780    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
781        && include_metadata
782            .iter()
783            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
784    {
785        // TODO(guswynn): should this be `bail_unsupported!`?
786        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
787    }
788    if !matches!(
789        source_connection,
790        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
791    ) && !include_metadata.is_empty()
792    {
793        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
794    }
795
796    let external_connection = match source_connection {
797        CreateSourceConnection::Kafka {
798            connection: connection_name,
799            options,
800        } => {
801            let connection_item = scx.get_item_by_resolved_name(connection_name)?;
802            if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
803                sql_bail!(
804                    "{} is not a kafka connection",
805                    scx.catalog.resolve_full_name(connection_item.name())
806                )
807            }
808
809            let KafkaSourceConfigOptionExtracted {
810                group_id_prefix,
811                topic,
812                topic_metadata_refresh_interval,
813                start_timestamp: _, // purified into `start_offset`
814                start_offset,
815                seen: _,
816            }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
817
818            let topic = topic.expect("validated exists during purification");
819
820            let mut start_offsets = BTreeMap::new();
821            if let Some(offsets) = start_offset {
822                for (part, offset) in offsets.iter().enumerate() {
823                    if *offset < 0 {
824                        sql_bail!("START OFFSET must be a nonnegative integer");
825                    }
826                    start_offsets.insert(i32::try_from(part)?, *offset);
827                }
828            }
829
830            if !start_offsets.is_empty() && envelope.requires_all_input() {
831                sql_bail!("START OFFSET is not supported with ENVELOPE {}", envelope)
832            }
833
834            if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
835                // This is a librdkafka-enforced restriction that, if violated,
836                // would result in a runtime error for the source.
837                sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
838            }
839
840            if !include_metadata.is_empty()
841                && !matches!(
842                    envelope,
843                    ast::SourceEnvelope::Upsert { .. }
844                        | ast::SourceEnvelope::None
845                        | ast::SourceEnvelope::Debezium
846                )
847            {
848                // TODO(guswynn): should this be `bail_unsupported!`?
849                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
850            }
851
852            // This defines the metadata columns for the primary export of this kafka source,
853            // not any tables that are created from it.
854            // TODO: Remove this when we stop outputting to the primary export of a source.
855            let metadata_columns = include_metadata
856                .into_iter()
857                .flat_map(|item| match item {
858                    SourceIncludeMetadata::Timestamp { alias } => {
859                        let name = match alias {
860                            Some(name) => name.to_string(),
861                            None => "timestamp".to_owned(),
862                        };
863                        Some((name, KafkaMetadataKind::Timestamp))
864                    }
865                    SourceIncludeMetadata::Partition { alias } => {
866                        let name = match alias {
867                            Some(name) => name.to_string(),
868                            None => "partition".to_owned(),
869                        };
870                        Some((name, KafkaMetadataKind::Partition))
871                    }
872                    SourceIncludeMetadata::Offset { alias } => {
873                        let name = match alias {
874                            Some(name) => name.to_string(),
875                            None => "offset".to_owned(),
876                        };
877                        Some((name, KafkaMetadataKind::Offset))
878                    }
879                    SourceIncludeMetadata::Headers { alias } => {
880                        let name = match alias {
881                            Some(name) => name.to_string(),
882                            None => "headers".to_owned(),
883                        };
884                        Some((name, KafkaMetadataKind::Headers))
885                    }
886                    SourceIncludeMetadata::Header {
887                        alias,
888                        key,
889                        use_bytes,
890                    } => Some((
891                        alias.to_string(),
892                        KafkaMetadataKind::Header {
893                            key: key.clone(),
894                            use_bytes: *use_bytes,
895                        },
896                    )),
897                    SourceIncludeMetadata::Key { .. } => {
898                        // handled below
899                        None
900                    }
901                })
902                .collect();
903
904            let connection = KafkaSourceConnection::<ReferencedConnection> {
905                connection: connection_item.id(),
906                connection_id: connection_item.id(),
907                topic,
908                start_offsets,
909                group_id_prefix,
910                topic_metadata_refresh_interval,
911                metadata_columns,
912            };
913
914            GenericSourceConnection::Kafka(connection)
915        }
916        CreateSourceConnection::Postgres {
917            connection,
918            options,
919        }
920        | CreateSourceConnection::Yugabyte {
921            connection,
922            options,
923        } => {
924            let (source_flavor, flavor_name) = match source_connection {
925                CreateSourceConnection::Postgres { .. } => (PostgresFlavor::Vanilla, "PostgreSQL"),
926                CreateSourceConnection::Yugabyte { .. } => (PostgresFlavor::Yugabyte, "Yugabyte"),
927                _ => unreachable!(),
928            };
929
930            let connection_item = scx.get_item_by_resolved_name(connection)?;
931            let connection_mismatch = match connection_item.connection()? {
932                Connection::Postgres(conn) => source_flavor != conn.flavor,
933                _ => true,
934            };
935            if connection_mismatch {
936                sql_bail!(
937                    "{} is not a {flavor_name} connection",
938                    scx.catalog.resolve_full_name(connection_item.name())
939                )
940            }
941
942            let PgConfigOptionExtracted {
943                details,
944                publication,
945                // text columns are already part of the source-exports and are only included
946                // in these options for round-tripping of a `CREATE SOURCE` statement. This should
947                // be removed once we drop support for implicitly created subsources.
948                text_columns: _,
949                seen: _,
950            } = options.clone().try_into()?;
951
952            let details = details
953                .as_ref()
954                .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
955            let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
956            let details = ProtoPostgresSourcePublicationDetails::decode(&*details)
957                .map_err(|e| sql_err!("{}", e))?;
958
959            let publication_details = PostgresSourcePublicationDetails::from_proto(details)
960                .map_err(|e| sql_err!("{}", e))?;
961
962            let connection =
963                GenericSourceConnection::<ReferencedConnection>::from(PostgresSourceConnection {
964                    connection: connection_item.id(),
965                    connection_id: connection_item.id(),
966                    publication: publication.expect("validated exists during purification"),
967                    publication_details,
968                });
969
970            connection
971        }
972        CreateSourceConnection::SqlServer {
973            connection,
974            options,
975        } => {
976            let connection_item = scx.get_item_by_resolved_name(connection)?;
977            match connection_item.connection()? {
978                Connection::SqlServer(connection) => connection,
979                _ => sql_bail!(
980                    "{} is not a SQL Server connection",
981                    scx.catalog.resolve_full_name(connection_item.name())
982                ),
983            };
984
985            let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
986            let details = details
987                .as_ref()
988                .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
989            let extras = hex::decode(details)
990                .map_err(|e| sql_err!("{e}"))
991                .and_then(|raw| {
992                    ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}"))
993                })
994                .and_then(|proto| {
995                    SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}"))
996                })?;
997
998            let connection =
999                GenericSourceConnection::<ReferencedConnection>::from(SqlServerSource {
1000                    catalog_id: connection_item.id(),
1001                    connection: connection_item.id(),
1002                    extras,
1003                });
1004
1005            connection
1006        }
1007        CreateSourceConnection::MySql {
1008            connection,
1009            options,
1010        } => {
1011            let connection_item = scx.get_item_by_resolved_name(connection)?;
1012            match connection_item.connection()? {
1013                Connection::MySql(connection) => connection,
1014                _ => sql_bail!(
1015                    "{} is not a MySQL connection",
1016                    scx.catalog.resolve_full_name(connection_item.name())
1017                ),
1018            };
1019            let MySqlConfigOptionExtracted {
1020                details,
1021                // text/exclude columns are already part of the source-exports and are only included
1022                // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1023                // be removed once we drop support for implicitly created subsources.
1024                text_columns: _,
1025                exclude_columns: _,
1026                seen: _,
1027            } = options.clone().try_into()?;
1028
1029            let details = details
1030                .as_ref()
1031                .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1032            let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1033            let details =
1034                ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1035            let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1036
1037            let connection =
1038                GenericSourceConnection::<ReferencedConnection>::from(MySqlSourceConnection {
1039                    connection: connection_item.id(),
1040                    connection_id: connection_item.id(),
1041                    details,
1042                });
1043
1044            connection
1045        }
1046        CreateSourceConnection::LoadGenerator { generator, options } => {
1047            let load_generator =
1048                load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1049
1050            let LoadGeneratorOptionExtracted {
1051                tick_interval,
1052                as_of,
1053                up_to,
1054                ..
1055            } = options.clone().try_into()?;
1056            let tick_micros = match tick_interval {
1057                Some(interval) => Some(interval.as_micros().try_into()?),
1058                None => None,
1059            };
1060
1061            if up_to < as_of {
1062                sql_bail!("UP TO cannot be less than AS OF");
1063            }
1064
1065            let connection = GenericSourceConnection::from(LoadGeneratorSourceConnection {
1066                load_generator,
1067                tick_micros,
1068                as_of,
1069                up_to,
1070            });
1071
1072            connection
1073        }
1074    };
1075
1076    let CreateSourceOptionExtracted {
1077        timestamp_interval,
1078        retain_history,
1079        seen: _,
1080    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
1081
1082    let metadata_columns_desc = match external_connection {
1083        GenericSourceConnection::Kafka(KafkaSourceConnection {
1084            ref metadata_columns,
1085            ..
1086        }) => kafka_metadata_columns_desc(metadata_columns),
1087        _ => vec![],
1088    };
1089
1090    // Generate the relation description for the primary export of the source.
1091    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1092        scx,
1093        &envelope,
1094        format,
1095        Some(external_connection.default_key_desc()),
1096        external_connection.default_value_desc(),
1097        include_metadata,
1098        metadata_columns_desc,
1099        &external_connection,
1100    )?;
1101    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
1102
1103    let names: Vec<_> = desc.iter_names().cloned().collect();
1104    if let Some(dup) = names.iter().duplicates().next() {
1105        sql_bail!("column {} specified more than once", dup.quoted());
1106    }
1107
1108    // Apply user-specified key constraint
1109    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
1110        // Don't remove this without addressing
1111        // https://github.com/MaterializeInc/database-issues/issues/4371.
1112        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
1113
1114        let key_columns = columns
1115            .into_iter()
1116            .map(normalize::column_name)
1117            .collect::<Vec<_>>();
1118
1119        let mut uniq = BTreeSet::new();
1120        for col in key_columns.iter() {
1121            if !uniq.insert(col) {
1122                sql_bail!("Repeated column name in source key constraint: {}", col);
1123            }
1124        }
1125
1126        let key_indices = key_columns
1127            .iter()
1128            .map(|col| {
1129                let name_idx = desc
1130                    .get_by_name(col)
1131                    .map(|(idx, _type)| idx)
1132                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
1133                if desc.get_unambiguous_name(name_idx).is_none() {
1134                    sql_bail!("Ambiguous column in source key constraint: {}", col);
1135                }
1136                Ok(name_idx)
1137            })
1138            .collect::<Result<Vec<_>, _>>()?;
1139
1140        if !desc.typ().keys.is_empty() {
1141            return Err(key_constraint_err(&desc, &key_columns));
1142        } else {
1143            desc = desc.with_key(key_indices);
1144        }
1145    }
1146
1147    let timestamp_interval = match timestamp_interval {
1148        Some(duration) => {
1149            let min = scx.catalog.system_vars().min_timestamp_interval();
1150            let max = scx.catalog.system_vars().max_timestamp_interval();
1151            if duration < min || duration > max {
1152                return Err(PlanError::InvalidTimestampInterval {
1153                    min,
1154                    max,
1155                    requested: duration,
1156                });
1157            }
1158            duration
1159        }
1160        None => scx.catalog.config().timestamp_interval,
1161    };
1162
1163    let (primary_export, primary_export_details) = if force_source_table_syntax {
1164        (
1165            SourceExportDataConfig {
1166                encoding: None,
1167                envelope: SourceEnvelope::None(NoneEnvelope {
1168                    key_envelope: KeyEnvelope::None,
1169                    key_arity: 0,
1170                }),
1171            },
1172            SourceExportDetails::None,
1173        )
1174    } else {
1175        (
1176            SourceExportDataConfig {
1177                encoding,
1178                envelope: envelope.clone(),
1179            },
1180            external_connection.primary_export_details(),
1181        )
1182    };
1183    let source_desc = SourceDesc::<ReferencedConnection> {
1184        connection: external_connection,
1185        // We only define primary-export details for this source if we are still supporting
1186        // the legacy source syntax. Otherwise, we will not output to the primary collection.
1187        // TODO(database-issues#8620): Remove this field once the new syntax is enabled everywhere
1188        primary_export,
1189        primary_export_details,
1190        timestamp_interval,
1191    };
1192
1193    let progress_subsource = match progress_subsource {
1194        Some(name) => match name {
1195            DeferredItemName::Named(name) => match name {
1196                ResolvedItemName::Item { id, .. } => *id,
1197                ResolvedItemName::Cte { .. }
1198                | ResolvedItemName::ContinualTask { .. }
1199                | ResolvedItemName::Error => {
1200                    sql_bail!("[internal error] invalid target id")
1201                }
1202            },
1203            DeferredItemName::Deferred(_) => {
1204                sql_bail!("[internal error] progress subsource must be named during purification")
1205            }
1206        },
1207        _ => sql_bail!("[internal error] progress subsource must be named during purification"),
1208    };
1209
1210    let if_not_exists = *if_not_exists;
1211    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1212
1213    // Check for an object in the catalog with this same name
1214    let full_name = scx.catalog.resolve_full_name(&name);
1215    let partial_name = PartialItemName::from(full_name.clone());
1216    // For PostgreSQL compatibility, we need to prevent creating sources when
1217    // there is an existing object *or* type of the same name.
1218    if let (false, Ok(item)) = (
1219        if_not_exists,
1220        scx.catalog.resolve_item_or_type(&partial_name),
1221    ) {
1222        return Err(PlanError::ItemAlreadyExists {
1223            name: full_name.to_string(),
1224            item_type: item.item_type(),
1225        });
1226    }
1227
1228    // We will rewrite the cluster if one is not provided, so we must use the
1229    // `in_cluster` value we plan to normalize when we canonicalize the create
1230    // statement.
1231    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
1232
1233    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
1234
1235    // Determine a default timeline for the source.
1236    let timeline = match envelope {
1237        SourceEnvelope::CdcV2 => {
1238            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1239        }
1240        _ => Timeline::EpochMilliseconds,
1241    };
1242
1243    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1244    let source = Source {
1245        create_sql,
1246        data_source: DataSourceDesc::Ingestion(Ingestion {
1247            desc: source_desc,
1248            progress_subsource,
1249        }),
1250        desc,
1251        compaction_window,
1252    };
1253
1254    Ok(Plan::CreateSource(CreateSourcePlan {
1255        name,
1256        source,
1257        if_not_exists,
1258        timeline,
1259        in_cluster: Some(in_cluster.id()),
1260    }))
1261}
1262
1263fn apply_source_envelope_encoding(
1264    scx: &StatementContext,
1265    envelope: &ast::SourceEnvelope,
1266    format: &Option<FormatSpecifier<Aug>>,
1267    key_desc: Option<RelationDesc>,
1268    value_desc: RelationDesc,
1269    include_metadata: &[SourceIncludeMetadata],
1270    metadata_columns_desc: Vec<(&str, ColumnType)>,
1271    source_connection: &GenericSourceConnection<ReferencedConnection>,
1272) -> Result<
1273    (
1274        RelationDesc,
1275        SourceEnvelope,
1276        Option<SourceDataEncoding<ReferencedConnection>>,
1277    ),
1278    PlanError,
1279> {
1280    let encoding = match format {
1281        Some(format) => Some(get_encoding(scx, format, envelope)?),
1282        None => None,
1283    };
1284
1285    let (key_desc, value_desc) = match &encoding {
1286        Some(encoding) => {
1287            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1288            // single column of type bytes.
1289            match value_desc.typ().columns() {
1290                [typ] => match typ.scalar_type {
1291                    ScalarType::Bytes => {}
1292                    _ => sql_bail!(
1293                        "The schema produced by the source is incompatible with format decoding"
1294                    ),
1295                },
1296                _ => sql_bail!(
1297                    "The schema produced by the source is incompatible with format decoding"
1298                ),
1299            }
1300
1301            let (key_desc, value_desc) = encoding.desc()?;
1302
1303            // TODO(petrosagg): This piece of code seems to be making a statement about the
1304            // nullability of the NONE envelope when the source is Kafka. As written, the code
1305            // misses opportunities to mark columns as not nullable and is over conservative. For
1306            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1307            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1308            // should be replaced with precise type-level reasoning.
1309            let key_desc = key_desc.map(|desc| {
1310                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1311                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1312                if is_kafka && is_envelope_none {
1313                    RelationDesc::from_names_and_types(
1314                        desc.into_iter()
1315                            .map(|(name, typ)| (name, typ.nullable(true))),
1316                    )
1317                } else {
1318                    desc
1319                }
1320            });
1321            (key_desc, value_desc)
1322        }
1323        None => (key_desc, value_desc),
1324    };
1325
1326    // KEY VALUE load generators are the only UPSERT source that
1327    // has no encoding but defaults to `INCLUDE KEY`.
1328    //
1329    // As discussed
1330    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1331    // removing this special case amounts to deciding how to handle null keys
1332    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1333    // this special case in.
1334    //
1335    // Note that this is safe because this generator is
1336    // 1. The only source with no encoding that can have its key included.
1337    // 2. Never produces null keys (or values, for that matter).
1338    let key_envelope_no_encoding = matches!(
1339        source_connection,
1340        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1341            load_generator: LoadGenerator::KeyValue(_),
1342            ..
1343        })
1344    );
1345    let mut key_envelope = get_key_envelope(
1346        include_metadata,
1347        encoding.as_ref(),
1348        key_envelope_no_encoding,
1349    )?;
1350
1351    match (&envelope, &key_envelope) {
1352        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1353        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1354            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1355        ),
1356        _ => {}
1357    };
1358
1359    // Not all source envelopes are compatible with all source connections.
1360    // Whoever constructs the source ingestion pipeline is responsible for
1361    // choosing compatible envelopes and connections.
1362    //
1363    // TODO(guswynn): ambiguously assert which connections and envelopes are
1364    // compatible in typechecking
1365    //
1366    // TODO: remove bails as more support for upsert is added.
1367    let envelope = match &envelope {
1368        // TODO: fixup key envelope
1369        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1370        ast::SourceEnvelope::Debezium => {
1371            //TODO check that key envelope is not set
1372            let after_idx = match typecheck_debezium(&value_desc) {
1373                Ok((_before_idx, after_idx)) => Ok(after_idx),
1374                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1375                    Some(DataEncoding::Avro(_)) => Err(type_err),
1376                    _ => Err(sql_err!(
1377                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1378                    )),
1379                },
1380            }?;
1381
1382            UnplannedSourceEnvelope::Upsert {
1383                style: UpsertStyle::Debezium { after_idx },
1384            }
1385        }
1386        ast::SourceEnvelope::Upsert {
1387            value_decode_err_policy,
1388        } => {
1389            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1390                None => {
1391                    if !key_envelope_no_encoding {
1392                        bail_unsupported!(format!(
1393                            "UPSERT requires a key/value format: {:?}",
1394                            format
1395                        ))
1396                    }
1397                    None
1398                }
1399                Some(key_encoding) => Some(key_encoding),
1400            };
1401            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1402            // specified.
1403            if key_envelope == KeyEnvelope::None {
1404                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1405            }
1406            // If the value decode error policy is not set we use the default upsert style.
1407            let style = match value_decode_err_policy.as_slice() {
1408                [] => UpsertStyle::Default(key_envelope),
1409                [SourceErrorPolicy::Inline { alias }] => {
1410                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1411                    UpsertStyle::ValueErrInline {
1412                        key_envelope,
1413                        error_column: alias
1414                            .as_ref()
1415                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1416                    }
1417                }
1418                _ => {
1419                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1420                }
1421            };
1422
1423            UnplannedSourceEnvelope::Upsert { style }
1424        }
1425        ast::SourceEnvelope::CdcV2 => {
1426            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1427            //TODO check that key envelope is not set
1428            match format {
1429                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1430                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1431            }
1432            UnplannedSourceEnvelope::CdcV2
1433        }
1434    };
1435
1436    let metadata_desc = included_column_desc(metadata_columns_desc);
1437    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1438
1439    Ok((desc, envelope, encoding))
1440}
1441
1442/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1443/// of columns and constraints.
1444fn plan_source_export_desc(
1445    scx: &StatementContext,
1446    name: &UnresolvedItemName,
1447    columns: &Vec<ColumnDef<Aug>>,
1448    constraints: &Vec<TableConstraint<Aug>>,
1449) -> Result<RelationDesc, PlanError> {
1450    let names: Vec<_> = columns
1451        .iter()
1452        .map(|c| normalize::column_name(c.name.clone()))
1453        .collect();
1454
1455    if let Some(dup) = names.iter().duplicates().next() {
1456        sql_bail!("column {} specified more than once", dup.quoted());
1457    }
1458
1459    // Build initial relation type that handles declared data types
1460    // and NOT NULL constraints.
1461    let mut column_types = Vec::with_capacity(columns.len());
1462    let mut keys = Vec::new();
1463
1464    for (i, c) in columns.into_iter().enumerate() {
1465        let aug_data_type = &c.data_type;
1466        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1467        let mut nullable = true;
1468        for option in &c.options {
1469            match &option.option {
1470                ColumnOption::NotNull => nullable = false,
1471                ColumnOption::Default(_) => {
1472                    bail_unsupported!("Source export with default value")
1473                }
1474                ColumnOption::Unique { is_primary } => {
1475                    keys.push(vec![i]);
1476                    if *is_primary {
1477                        nullable = false;
1478                    }
1479                }
1480                other => {
1481                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1482                }
1483            }
1484        }
1485        column_types.push(ty.nullable(nullable));
1486    }
1487
1488    let mut seen_primary = false;
1489    'c: for constraint in constraints {
1490        match constraint {
1491            TableConstraint::Unique {
1492                name: _,
1493                columns,
1494                is_primary,
1495                nulls_not_distinct,
1496            } => {
1497                if seen_primary && *is_primary {
1498                    sql_bail!(
1499                        "multiple primary keys for source export {} are not allowed",
1500                        name.to_ast_string_stable()
1501                    );
1502                }
1503                seen_primary = *is_primary || seen_primary;
1504
1505                let mut key = vec![];
1506                for column in columns {
1507                    let column = normalize::column_name(column.clone());
1508                    match names.iter().position(|name| *name == column) {
1509                        None => sql_bail!("unknown column in constraint: {}", column),
1510                        Some(i) => {
1511                            let nullable = &mut column_types[i].nullable;
1512                            if *is_primary {
1513                                if *nulls_not_distinct {
1514                                    sql_bail!(
1515                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1516                                    );
1517                                }
1518                                *nullable = false;
1519                            } else if !(*nulls_not_distinct || !*nullable) {
1520                                // Non-primary key unique constraints are only keys if all of their
1521                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1522                                break 'c;
1523                            }
1524
1525                            key.push(i);
1526                        }
1527                    }
1528                }
1529
1530                if *is_primary {
1531                    keys.insert(0, key);
1532                } else {
1533                    keys.push(key);
1534                }
1535            }
1536            TableConstraint::ForeignKey { .. } => {
1537                bail_unsupported!("Source export with a foreign key")
1538            }
1539            TableConstraint::Check { .. } => {
1540                bail_unsupported!("Source export with a check constraint")
1541            }
1542        }
1543    }
1544
1545    let typ = RelationType::new(column_types).with_keys(keys);
1546    let desc = RelationDesc::new(typ, names);
1547    Ok(desc)
1548}
1549
1550generate_extracted_config!(
1551    CreateSubsourceOption,
1552    (Progress, bool, Default(false)),
1553    (ExternalReference, UnresolvedItemName),
1554    (RetainHistory, OptionalDuration),
1555    (TextColumns, Vec::<Ident>, Default(vec![])),
1556    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1557    (Details, String)
1558);
1559
1560pub fn plan_create_subsource(
1561    scx: &StatementContext,
1562    stmt: CreateSubsourceStatement<Aug>,
1563) -> Result<Plan, PlanError> {
1564    let CreateSubsourceStatement {
1565        name,
1566        columns,
1567        of_source,
1568        constraints,
1569        if_not_exists,
1570        with_options,
1571    } = &stmt;
1572
1573    let CreateSubsourceOptionExtracted {
1574        progress,
1575        retain_history,
1576        external_reference,
1577        text_columns,
1578        exclude_columns,
1579        details,
1580        seen: _,
1581    } = with_options.clone().try_into()?;
1582
1583    // This invariant is enforced during purification; we are responsible for
1584    // creating the AST for subsources as a response to CREATE SOURCE
1585    // statements, so this would fire in integration testing if we failed to
1586    // uphold it.
1587    assert!(
1588        progress ^ (external_reference.is_some() && of_source.is_some()),
1589        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1590    );
1591
1592    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1593
1594    let data_source = if let Some(source_reference) = of_source {
1595        // If the new source table syntax is forced we should not be creating any non-progress
1596        // subsources.
1597        if scx.catalog.system_vars().enable_create_table_from_source()
1598            && scx.catalog.system_vars().force_source_table_syntax()
1599        {
1600            Err(PlanError::UseTablesForSources(
1601                "CREATE SUBSOURCE".to_string(),
1602            ))?;
1603        }
1604
1605        // This is a subsource with the "natural" dependency order, i.e. it is
1606        // not a legacy subsource with the inverted structure.
1607        let ingestion_id = *source_reference.item_id();
1608        let external_reference = external_reference.unwrap();
1609
1610        // Decode the details option stored on the subsource statement, which contains information
1611        // created during the purification process.
1612        let details = details
1613            .as_ref()
1614            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1615        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1616        let details =
1617            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1618        let details =
1619            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1620        let details = match details {
1621            SourceExportStatementDetails::Postgres { table } => {
1622                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1623                    column_casts: crate::pure::postgres::generate_column_casts(
1624                        scx,
1625                        &table,
1626                        &text_columns,
1627                    )?,
1628                    table,
1629                })
1630            }
1631            SourceExportStatementDetails::MySql {
1632                table,
1633                initial_gtid_set,
1634            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1635                table,
1636                initial_gtid_set,
1637                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1638                exclude_columns: exclude_columns
1639                    .into_iter()
1640                    .map(|c| c.into_string())
1641                    .collect(),
1642            }),
1643            SourceExportStatementDetails::SqlServer {
1644                table,
1645                capture_instance,
1646                initial_lsn,
1647            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1648                capture_instance,
1649                table,
1650                initial_lsn,
1651                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1652                exclude_columns: exclude_columns
1653                    .into_iter()
1654                    .map(|c| c.into_string())
1655                    .collect(),
1656            }),
1657            SourceExportStatementDetails::LoadGenerator { output } => {
1658                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1659            }
1660            SourceExportStatementDetails::Kafka {} => {
1661                bail_unsupported!("subsources cannot reference Kafka sources")
1662            }
1663        };
1664        DataSourceDesc::IngestionExport {
1665            ingestion_id,
1666            external_reference,
1667            details,
1668            // Subsources don't currently support non-default envelopes / encoding
1669            data_config: SourceExportDataConfig {
1670                envelope: SourceEnvelope::None(NoneEnvelope {
1671                    key_envelope: KeyEnvelope::None,
1672                    key_arity: 0,
1673                }),
1674                encoding: None,
1675            },
1676        }
1677    } else if progress {
1678        DataSourceDesc::Progress
1679    } else {
1680        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1681    };
1682
1683    let if_not_exists = *if_not_exists;
1684    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1685
1686    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1687
1688    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1689    let source = Source {
1690        create_sql,
1691        data_source,
1692        desc,
1693        compaction_window,
1694    };
1695
1696    Ok(Plan::CreateSource(CreateSourcePlan {
1697        name,
1698        source,
1699        if_not_exists,
1700        timeline: Timeline::EpochMilliseconds,
1701        in_cluster: None,
1702    }))
1703}
1704
1705generate_extracted_config!(
1706    TableFromSourceOption,
1707    (TextColumns, Vec::<Ident>, Default(vec![])),
1708    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1709    (PartitionBy, Vec<Ident>),
1710    (RetainHistory, OptionalDuration),
1711    (Details, String)
1712);
1713
1714pub fn plan_create_table_from_source(
1715    scx: &StatementContext,
1716    stmt: CreateTableFromSourceStatement<Aug>,
1717) -> Result<Plan, PlanError> {
1718    if !scx.catalog.system_vars().enable_create_table_from_source() {
1719        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1720    }
1721
1722    let CreateTableFromSourceStatement {
1723        name,
1724        columns,
1725        constraints,
1726        if_not_exists,
1727        source,
1728        external_reference,
1729        envelope,
1730        format,
1731        include_metadata,
1732        with_options,
1733    } = &stmt;
1734
1735    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1736
1737    let TableFromSourceOptionExtracted {
1738        text_columns,
1739        exclude_columns,
1740        retain_history,
1741        partition_by,
1742        details,
1743        seen: _,
1744    } = with_options.clone().try_into()?;
1745
1746    let source_item = scx.get_item_by_resolved_name(source)?;
1747    let ingestion_id = source_item.id();
1748
1749    // Decode the details option stored on the statement, which contains information
1750    // created during the purification process.
1751    let details = details
1752        .as_ref()
1753        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1754    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1755    let details =
1756        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1757    let details =
1758        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1759
1760    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1761        && include_metadata
1762            .iter()
1763            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1764    {
1765        // TODO(guswynn): should this be `bail_unsupported!`?
1766        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1767    }
1768    if !matches!(
1769        details,
1770        SourceExportStatementDetails::Kafka { .. }
1771            | SourceExportStatementDetails::LoadGenerator { .. }
1772    ) && !include_metadata.is_empty()
1773    {
1774        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1775    }
1776
1777    let details = match details {
1778        SourceExportStatementDetails::Postgres { table } => {
1779            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1780                column_casts: crate::pure::postgres::generate_column_casts(
1781                    scx,
1782                    &table,
1783                    &text_columns,
1784                )?,
1785                table,
1786            })
1787        }
1788        SourceExportStatementDetails::MySql {
1789            table,
1790            initial_gtid_set,
1791        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1792            table,
1793            initial_gtid_set,
1794            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1795            exclude_columns: exclude_columns
1796                .into_iter()
1797                .map(|c| c.into_string())
1798                .collect(),
1799        }),
1800        SourceExportStatementDetails::SqlServer {
1801            table,
1802            capture_instance,
1803            initial_lsn,
1804        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1805            table,
1806            capture_instance,
1807            initial_lsn,
1808            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1809            exclude_columns: exclude_columns
1810                .into_iter()
1811                .map(|c| c.into_string())
1812                .collect(),
1813        }),
1814        SourceExportStatementDetails::LoadGenerator { output } => {
1815            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1816        }
1817        SourceExportStatementDetails::Kafka {} => {
1818            if !include_metadata.is_empty()
1819                && !matches!(
1820                    envelope,
1821                    ast::SourceEnvelope::Upsert { .. }
1822                        | ast::SourceEnvelope::None
1823                        | ast::SourceEnvelope::Debezium
1824                )
1825            {
1826                // TODO(guswynn): should this be `bail_unsupported!`?
1827                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1828            }
1829
1830            let metadata_columns = include_metadata
1831                .into_iter()
1832                .flat_map(|item| match item {
1833                    SourceIncludeMetadata::Timestamp { alias } => {
1834                        let name = match alias {
1835                            Some(name) => name.to_string(),
1836                            None => "timestamp".to_owned(),
1837                        };
1838                        Some((name, KafkaMetadataKind::Timestamp))
1839                    }
1840                    SourceIncludeMetadata::Partition { alias } => {
1841                        let name = match alias {
1842                            Some(name) => name.to_string(),
1843                            None => "partition".to_owned(),
1844                        };
1845                        Some((name, KafkaMetadataKind::Partition))
1846                    }
1847                    SourceIncludeMetadata::Offset { alias } => {
1848                        let name = match alias {
1849                            Some(name) => name.to_string(),
1850                            None => "offset".to_owned(),
1851                        };
1852                        Some((name, KafkaMetadataKind::Offset))
1853                    }
1854                    SourceIncludeMetadata::Headers { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "headers".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Headers))
1860                    }
1861                    SourceIncludeMetadata::Header {
1862                        alias,
1863                        key,
1864                        use_bytes,
1865                    } => Some((
1866                        alias.to_string(),
1867                        KafkaMetadataKind::Header {
1868                            key: key.clone(),
1869                            use_bytes: *use_bytes,
1870                        },
1871                    )),
1872                    SourceIncludeMetadata::Key { .. } => {
1873                        // handled below
1874                        None
1875                    }
1876                })
1877                .collect();
1878
1879            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1880        }
1881    };
1882
1883    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1884
1885    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1886    // during purification and define the `columns` and `constraints` fields for the statement,
1887    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1888    // we use the source connection's default schema.
1889    let (key_desc, value_desc) =
1890        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1891            let columns = match columns {
1892                TableFromSourceColumns::Defined(columns) => columns,
1893                _ => unreachable!(),
1894            };
1895            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1896            (None, desc)
1897        } else {
1898            let key_desc = source_connection.default_key_desc();
1899            let value_desc = source_connection.default_value_desc();
1900            (Some(key_desc), value_desc)
1901        };
1902
1903    let metadata_columns_desc = match &details {
1904        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1905            metadata_columns, ..
1906        }) => kafka_metadata_columns_desc(metadata_columns),
1907        _ => vec![],
1908    };
1909
1910    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1911        scx,
1912        &envelope,
1913        format,
1914        key_desc,
1915        value_desc,
1916        include_metadata,
1917        metadata_columns_desc,
1918        source_connection,
1919    )?;
1920    if let TableFromSourceColumns::Named(col_names) = columns {
1921        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1922    }
1923
1924    let names: Vec<_> = desc.iter_names().cloned().collect();
1925    if let Some(dup) = names.iter().duplicates().next() {
1926        sql_bail!("column {} specified more than once", dup.quoted());
1927    }
1928
1929    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1930
1931    // Allow users to specify a timeline. If they do not, determine a default
1932    // timeline for the source.
1933    let timeline = match envelope {
1934        SourceEnvelope::CdcV2 => {
1935            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1936        }
1937        _ => Timeline::EpochMilliseconds,
1938    };
1939
1940    if let Some(partition_by) = partition_by {
1941        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1942        check_partition_by(&desc, partition_by)?;
1943    }
1944
1945    let data_source = DataSourceDesc::IngestionExport {
1946        ingestion_id,
1947        external_reference: external_reference
1948            .as_ref()
1949            .expect("populated in purification")
1950            .clone(),
1951        details,
1952        data_config: SourceExportDataConfig { envelope, encoding },
1953    };
1954
1955    let if_not_exists = *if_not_exists;
1956
1957    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1958
1959    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1960    let table = Table {
1961        create_sql,
1962        desc: VersionedRelationDesc::new(desc),
1963        temporary: false,
1964        compaction_window,
1965        data_source: TableDataSource::DataSource {
1966            desc: data_source,
1967            timeline,
1968        },
1969    };
1970
1971    Ok(Plan::CreateTable(CreateTablePlan {
1972        name,
1973        table,
1974        if_not_exists,
1975    }))
1976}
1977
1978generate_extracted_config!(
1979    LoadGeneratorOption,
1980    (TickInterval, Duration),
1981    (AsOf, u64, Default(0_u64)),
1982    (UpTo, u64, Default(u64::MAX)),
1983    (ScaleFactor, f64),
1984    (MaxCardinality, u64),
1985    (Keys, u64),
1986    (SnapshotRounds, u64),
1987    (TransactionalSnapshot, bool),
1988    (ValueSize, u64),
1989    (Seed, u64),
1990    (Partitions, u64),
1991    (BatchSize, u64)
1992);
1993
1994impl LoadGeneratorOptionExtracted {
1995    pub(super) fn ensure_only_valid_options(
1996        &self,
1997        loadgen: &ast::LoadGenerator,
1998    ) -> Result<(), PlanError> {
1999        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2000
2001        let mut options = self.seen.clone();
2002
2003        let permitted_options: &[_] = match loadgen {
2004            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2005            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2006            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2007            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2008            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2009            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2010            ast::LoadGenerator::KeyValue => &[
2011                TickInterval,
2012                Keys,
2013                SnapshotRounds,
2014                TransactionalSnapshot,
2015                ValueSize,
2016                Seed,
2017                Partitions,
2018                BatchSize,
2019            ],
2020        };
2021
2022        for o in permitted_options {
2023            options.remove(o);
2024        }
2025
2026        if !options.is_empty() {
2027            sql_bail!(
2028                "{} load generators do not support {} values",
2029                loadgen,
2030                options.iter().join(", ")
2031            )
2032        }
2033
2034        Ok(())
2035    }
2036}
2037
2038pub(crate) fn load_generator_ast_to_generator(
2039    scx: &StatementContext,
2040    loadgen: &ast::LoadGenerator,
2041    options: &[LoadGeneratorOption<Aug>],
2042    include_metadata: &[SourceIncludeMetadata],
2043) -> Result<LoadGenerator, PlanError> {
2044    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2045    extracted.ensure_only_valid_options(loadgen)?;
2046
2047    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2048        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2049    }
2050
2051    let load_generator = match loadgen {
2052        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2053        ast::LoadGenerator::Clock => {
2054            scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2055            LoadGenerator::Clock
2056        }
2057        ast::LoadGenerator::Counter => {
2058            let LoadGeneratorOptionExtracted {
2059                max_cardinality, ..
2060            } = extracted;
2061            LoadGenerator::Counter { max_cardinality }
2062        }
2063        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2064        ast::LoadGenerator::Datums => LoadGenerator::Datums,
2065        ast::LoadGenerator::Tpch => {
2066            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2067
2068            // Default to 0.01 scale factor (=10MB).
2069            let sf: f64 = scale_factor.unwrap_or(0.01);
2070            if !sf.is_finite() || sf < 0.0 {
2071                sql_bail!("unsupported scale factor {sf}");
2072            }
2073
2074            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2075                let total = (sf * multiplier).floor();
2076                let mut i = i64::try_cast_from(total)
2077                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2078                if i < 1 {
2079                    i = 1;
2080                }
2081                Ok(i)
2082            };
2083
2084            // The multiplications here are safely unchecked because they will
2085            // overflow to infinity, which will be caught by f64_to_i64.
2086            let count_supplier = f_to_i(10_000f64)?;
2087            let count_part = f_to_i(200_000f64)?;
2088            let count_customer = f_to_i(150_000f64)?;
2089            let count_orders = f_to_i(150_000f64 * 10f64)?;
2090            let count_clerk = f_to_i(1_000f64)?;
2091
2092            LoadGenerator::Tpch {
2093                count_supplier,
2094                count_part,
2095                count_customer,
2096                count_orders,
2097                count_clerk,
2098            }
2099        }
2100        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2101            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2102            let LoadGeneratorOptionExtracted {
2103                keys,
2104                snapshot_rounds,
2105                transactional_snapshot,
2106                value_size,
2107                tick_interval,
2108                seed,
2109                partitions,
2110                batch_size,
2111                ..
2112            } = extracted;
2113
2114            let mut include_offset = None;
2115            for im in include_metadata {
2116                match im {
2117                    SourceIncludeMetadata::Offset { alias } => {
2118                        include_offset = match alias {
2119                            Some(alias) => Some(alias.to_string()),
2120                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2121                        }
2122                    }
2123                    SourceIncludeMetadata::Key { .. } => continue,
2124
2125                    _ => {
2126                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2127                    }
2128                };
2129            }
2130
2131            let lgkv = KeyValueLoadGenerator {
2132                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2133                snapshot_rounds: snapshot_rounds
2134                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2135                // Defaults to true.
2136                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2137                value_size: value_size
2138                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2139                partitions: partitions
2140                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2141                tick_interval,
2142                batch_size: batch_size
2143                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2144                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2145                include_offset,
2146            };
2147
2148            if lgkv.keys == 0
2149                || lgkv.partitions == 0
2150                || lgkv.value_size == 0
2151                || lgkv.batch_size == 0
2152            {
2153                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2154            }
2155
2156            if lgkv.keys % lgkv.partitions != 0 {
2157                sql_bail!("KEYS must be a multiple of PARTITIONS")
2158            }
2159
2160            if lgkv.batch_size > lgkv.keys {
2161                sql_bail!("KEYS must be larger than BATCH SIZE")
2162            }
2163
2164            // This constraints simplifies the source implementation.
2165            // We can lift it later.
2166            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2167                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2168            }
2169
2170            if lgkv.snapshot_rounds == 0 {
2171                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2172            }
2173
2174            LoadGenerator::KeyValue(lgkv)
2175        }
2176    };
2177
2178    Ok(load_generator)
2179}
2180
2181fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2182    let before = value_desc.get_by_name(&"before".into());
2183    let (after_idx, after_ty) = value_desc
2184        .get_by_name(&"after".into())
2185        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2186    let before_idx = if let Some((before_idx, before_ty)) = before {
2187        if !matches!(before_ty.scalar_type, ScalarType::Record { .. }) {
2188            sql_bail!("'before' column must be of type record");
2189        }
2190        if before_ty != after_ty {
2191            sql_bail!("'before' type differs from 'after' column");
2192        }
2193        Some(before_idx)
2194    } else {
2195        None
2196    };
2197    Ok((before_idx, after_idx))
2198}
2199
2200fn get_encoding(
2201    scx: &StatementContext,
2202    format: &FormatSpecifier<Aug>,
2203    envelope: &ast::SourceEnvelope,
2204) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2205    let encoding = match format {
2206        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2207        FormatSpecifier::KeyValue { key, value } => {
2208            let key = {
2209                let encoding = get_encoding_inner(scx, key)?;
2210                Some(encoding.key.unwrap_or(encoding.value))
2211            };
2212            let value = get_encoding_inner(scx, value)?.value;
2213            SourceDataEncoding { key, value }
2214        }
2215    };
2216
2217    let requires_keyvalue = matches!(
2218        envelope,
2219        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2220    );
2221    let is_keyvalue = encoding.key.is_some();
2222    if requires_keyvalue && !is_keyvalue {
2223        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2224    };
2225
2226    Ok(encoding)
2227}
2228
2229/// Determine the cluster ID to use for this item.
2230///
2231/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2232/// Because of this, do not normalize/canonicalize the create SQL statement
2233/// until after calling this function.
2234fn source_sink_cluster_config<'a, 'ctx>(
2235    scx: &'a StatementContext<'ctx>,
2236    in_cluster: &mut Option<ResolvedClusterName>,
2237) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2238    let cluster = match in_cluster {
2239        None => {
2240            let cluster = scx.catalog.resolve_cluster(None)?;
2241            *in_cluster = Some(ResolvedClusterName {
2242                id: cluster.id(),
2243                print_name: None,
2244            });
2245            cluster
2246        }
2247        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2248    };
2249
2250    Ok(cluster)
2251}
2252
2253generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2254
2255#[derive(Debug)]
2256pub struct Schema {
2257    pub key_schema: Option<String>,
2258    pub value_schema: String,
2259    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2260    pub confluent_wire_format: bool,
2261}
2262
2263fn get_encoding_inner(
2264    scx: &StatementContext,
2265    format: &Format<Aug>,
2266) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2267    let value = match format {
2268        Format::Bytes => DataEncoding::Bytes,
2269        Format::Avro(schema) => {
2270            let Schema {
2271                key_schema,
2272                value_schema,
2273                csr_connection,
2274                confluent_wire_format,
2275            } = match schema {
2276                // TODO(jldlaughlin): we need a way to pass in primary key information
2277                // when building a source from a string or file.
2278                AvroSchema::InlineSchema {
2279                    schema: ast::Schema { schema },
2280                    with_options,
2281                } => {
2282                    let AvroSchemaOptionExtracted {
2283                        confluent_wire_format,
2284                        ..
2285                    } = with_options.clone().try_into()?;
2286
2287                    Schema {
2288                        key_schema: None,
2289                        value_schema: schema.clone(),
2290                        csr_connection: None,
2291                        confluent_wire_format,
2292                    }
2293                }
2294                AvroSchema::Csr {
2295                    csr_connection:
2296                        CsrConnectionAvro {
2297                            connection,
2298                            seed,
2299                            key_strategy: _,
2300                            value_strategy: _,
2301                        },
2302                } => {
2303                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2304                    let csr_connection = match item.connection()? {
2305                        Connection::Csr(_) => item.id(),
2306                        _ => {
2307                            sql_bail!(
2308                                "{} is not a schema registry connection",
2309                                scx.catalog
2310                                    .resolve_full_name(item.name())
2311                                    .to_string()
2312                                    .quoted()
2313                            )
2314                        }
2315                    };
2316
2317                    if let Some(seed) = seed {
2318                        Schema {
2319                            key_schema: seed.key_schema.clone(),
2320                            value_schema: seed.value_schema.clone(),
2321                            csr_connection: Some(csr_connection),
2322                            confluent_wire_format: true,
2323                        }
2324                    } else {
2325                        unreachable!("CSR seed resolution should already have been called: Avro")
2326                    }
2327                }
2328            };
2329
2330            if let Some(key_schema) = key_schema {
2331                return Ok(SourceDataEncoding {
2332                    key: Some(DataEncoding::Avro(AvroEncoding {
2333                        schema: key_schema,
2334                        csr_connection: csr_connection.clone(),
2335                        confluent_wire_format,
2336                    })),
2337                    value: DataEncoding::Avro(AvroEncoding {
2338                        schema: value_schema,
2339                        csr_connection,
2340                        confluent_wire_format,
2341                    }),
2342                });
2343            } else {
2344                DataEncoding::Avro(AvroEncoding {
2345                    schema: value_schema,
2346                    csr_connection,
2347                    confluent_wire_format,
2348                })
2349            }
2350        }
2351        Format::Protobuf(schema) => match schema {
2352            ProtobufSchema::Csr {
2353                csr_connection:
2354                    CsrConnectionProtobuf {
2355                        connection:
2356                            CsrConnection {
2357                                connection,
2358                                options,
2359                            },
2360                        seed,
2361                    },
2362            } => {
2363                if let Some(CsrSeedProtobuf { key, value }) = seed {
2364                    let item = scx.get_item_by_resolved_name(connection)?;
2365                    let _ = match item.connection()? {
2366                        Connection::Csr(connection) => connection,
2367                        _ => {
2368                            sql_bail!(
2369                                "{} is not a schema registry connection",
2370                                scx.catalog
2371                                    .resolve_full_name(item.name())
2372                                    .to_string()
2373                                    .quoted()
2374                            )
2375                        }
2376                    };
2377
2378                    if !options.is_empty() {
2379                        sql_bail!("Protobuf CSR connections do not support any options");
2380                    }
2381
2382                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2383                        descriptors: strconv::parse_bytes(&value.schema)?,
2384                        message_name: value.message_name.clone(),
2385                        confluent_wire_format: true,
2386                    });
2387                    if let Some(key) = key {
2388                        return Ok(SourceDataEncoding {
2389                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2390                                descriptors: strconv::parse_bytes(&key.schema)?,
2391                                message_name: key.message_name.clone(),
2392                                confluent_wire_format: true,
2393                            })),
2394                            value,
2395                        });
2396                    }
2397                    value
2398                } else {
2399                    unreachable!("CSR seed resolution should already have been called: Proto")
2400                }
2401            }
2402            ProtobufSchema::InlineSchema {
2403                message_name,
2404                schema: ast::Schema { schema },
2405            } => {
2406                let descriptors = strconv::parse_bytes(schema)?;
2407
2408                DataEncoding::Protobuf(ProtobufEncoding {
2409                    descriptors,
2410                    message_name: message_name.to_owned(),
2411                    confluent_wire_format: false,
2412                })
2413            }
2414        },
2415        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2416            regex: mz_repr::adt::regex::Regex::new(regex, false)
2417                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2418        }),
2419        Format::Csv { columns, delimiter } => {
2420            let columns = match columns {
2421                CsvColumns::Header { names } => {
2422                    if names.is_empty() {
2423                        sql_bail!("[internal error] column spec should get names in purify")
2424                    }
2425                    ColumnSpec::Header {
2426                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2427                    }
2428                }
2429                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2430            };
2431            DataEncoding::Csv(CsvEncoding {
2432                columns,
2433                delimiter: u8::try_from(*delimiter)
2434                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2435            })
2436        }
2437        Format::Json { array: false } => DataEncoding::Json,
2438        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2439        Format::Text => DataEncoding::Text,
2440    };
2441    Ok(SourceDataEncoding { key: None, value })
2442}
2443
2444/// Extract the key envelope, if it is requested
2445fn get_key_envelope(
2446    included_items: &[SourceIncludeMetadata],
2447    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2448    key_envelope_no_encoding: bool,
2449) -> Result<KeyEnvelope, PlanError> {
2450    let key_definition = included_items
2451        .iter()
2452        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2453    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2454        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2455            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2456            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2457            (Some(name), _) if key_envelope_no_encoding => {
2458                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2459            }
2460            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2461            (_, None) => {
2462                // `kd.alias` == `None` means `INCLUDE KEY`
2463                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2464                // These both make sense with the same error message
2465                sql_bail!(
2466                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2467                        got bare FORMAT"
2468                );
2469            }
2470        }
2471    } else {
2472        Ok(KeyEnvelope::None)
2473    }
2474}
2475
2476/// Gets the key envelope for a given key encoding when no name for the key has
2477/// been requested by the user.
2478fn get_unnamed_key_envelope(
2479    key: Option<&DataEncoding<ReferencedConnection>>,
2480) -> Result<KeyEnvelope, PlanError> {
2481    // If the key is requested but comes from an unnamed type then it gets the name "key"
2482    //
2483    // Otherwise it gets the names of the columns in the type
2484    let is_composite = match key {
2485        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2486        Some(
2487            DataEncoding::Avro(_)
2488            | DataEncoding::Csv(_)
2489            | DataEncoding::Protobuf(_)
2490            | DataEncoding::Regex { .. },
2491        ) => true,
2492        None => false,
2493    };
2494
2495    if is_composite {
2496        Ok(KeyEnvelope::Flattened)
2497    } else {
2498        Ok(KeyEnvelope::Named("key".to_string()))
2499    }
2500}
2501
2502pub fn describe_create_view(
2503    _: &StatementContext,
2504    _: CreateViewStatement<Aug>,
2505) -> Result<StatementDesc, PlanError> {
2506    Ok(StatementDesc::new(None))
2507}
2508
2509pub fn plan_view(
2510    scx: &StatementContext,
2511    def: &mut ViewDefinition<Aug>,
2512    temporary: bool,
2513) -> Result<(QualifiedItemName, View), PlanError> {
2514    let create_sql = normalize::create_statement(
2515        scx,
2516        Statement::CreateView(CreateViewStatement {
2517            if_exists: IfExistsBehavior::Error,
2518            temporary,
2519            definition: def.clone(),
2520        }),
2521    )?;
2522
2523    let ViewDefinition {
2524        name,
2525        columns,
2526        query,
2527    } = def;
2528
2529    let query::PlannedRootQuery {
2530        expr,
2531        mut desc,
2532        finishing,
2533        scope: _,
2534    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2535    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2536    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2537    // here to help with database-issues#236. However, in the meantime, there might be a better
2538    // approach to solve database-issues#236:
2539    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2540    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2541        &finishing,
2542        expr.arity()
2543    ));
2544    if expr.contains_parameters()? {
2545        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2546    }
2547
2548    let dependencies = expr
2549        .depends_on()
2550        .into_iter()
2551        .map(|gid| scx.catalog.resolve_item_id(&gid))
2552        .collect();
2553
2554    let name = if temporary {
2555        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2556    } else {
2557        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2558    };
2559
2560    plan_utils::maybe_rename_columns(
2561        format!("view {}", scx.catalog.resolve_full_name(&name)),
2562        &mut desc,
2563        columns,
2564    )?;
2565    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2566
2567    if let Some(dup) = names.iter().duplicates().next() {
2568        sql_bail!("column {} specified more than once", dup.quoted());
2569    }
2570
2571    let view = View {
2572        create_sql,
2573        expr,
2574        dependencies,
2575        column_names: names,
2576        temporary,
2577    };
2578
2579    Ok((name, view))
2580}
2581
2582pub fn plan_create_view(
2583    scx: &StatementContext,
2584    mut stmt: CreateViewStatement<Aug>,
2585) -> Result<Plan, PlanError> {
2586    let CreateViewStatement {
2587        temporary,
2588        if_exists,
2589        definition,
2590    } = &mut stmt;
2591    let (name, view) = plan_view(scx, definition, *temporary)?;
2592
2593    // Override the statement-level IfExistsBehavior with Skip if this is
2594    // explicitly requested in the PlanContext (the default is `false`).
2595    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2596
2597    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2598        let if_exists = true;
2599        let cascade = false;
2600        let maybe_item_to_drop = plan_drop_item(
2601            scx,
2602            ObjectType::View,
2603            if_exists,
2604            definition.name.clone(),
2605            cascade,
2606        )?;
2607
2608        // Check if the new View depends on the item that we would be replacing.
2609        if let Some(id) = maybe_item_to_drop {
2610            let dependencies = view.expr.depends_on();
2611            let invalid_drop = scx
2612                .get_item(&id)
2613                .global_ids()
2614                .any(|gid| dependencies.contains(&gid));
2615            if invalid_drop {
2616                let item = scx.catalog.get_item(&id);
2617                sql_bail!(
2618                    "cannot replace view {0}: depended upon by new {0} definition",
2619                    scx.catalog.resolve_full_name(item.name())
2620                );
2621            }
2622
2623            Some(id)
2624        } else {
2625            None
2626        }
2627    } else {
2628        None
2629    };
2630    let drop_ids = replace
2631        .map(|id| {
2632            scx.catalog
2633                .item_dependents(id)
2634                .into_iter()
2635                .map(|id| id.unwrap_item_id())
2636                .collect()
2637        })
2638        .unwrap_or_default();
2639
2640    // Check for an object in the catalog with this same name
2641    let full_name = scx.catalog.resolve_full_name(&name);
2642    let partial_name = PartialItemName::from(full_name.clone());
2643    // For PostgreSQL compatibility, we need to prevent creating views when
2644    // there is an existing object *or* type of the same name.
2645    if let (Ok(item), IfExistsBehavior::Error, false) = (
2646        scx.catalog.resolve_item_or_type(&partial_name),
2647        *if_exists,
2648        ignore_if_exists_errors,
2649    ) {
2650        return Err(PlanError::ItemAlreadyExists {
2651            name: full_name.to_string(),
2652            item_type: item.item_type(),
2653        });
2654    }
2655
2656    Ok(Plan::CreateView(CreateViewPlan {
2657        name,
2658        view,
2659        replace,
2660        drop_ids,
2661        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2662        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2663    }))
2664}
2665
2666pub fn describe_create_materialized_view(
2667    _: &StatementContext,
2668    _: CreateMaterializedViewStatement<Aug>,
2669) -> Result<StatementDesc, PlanError> {
2670    Ok(StatementDesc::new(None))
2671}
2672
2673pub fn describe_create_continual_task(
2674    _: &StatementContext,
2675    _: CreateContinualTaskStatement<Aug>,
2676) -> Result<StatementDesc, PlanError> {
2677    Ok(StatementDesc::new(None))
2678}
2679
2680pub fn describe_create_network_policy(
2681    _: &StatementContext,
2682    _: CreateNetworkPolicyStatement<Aug>,
2683) -> Result<StatementDesc, PlanError> {
2684    Ok(StatementDesc::new(None))
2685}
2686
2687pub fn describe_alter_network_policy(
2688    _: &StatementContext,
2689    _: AlterNetworkPolicyStatement<Aug>,
2690) -> Result<StatementDesc, PlanError> {
2691    Ok(StatementDesc::new(None))
2692}
2693
2694pub fn plan_create_materialized_view(
2695    scx: &StatementContext,
2696    mut stmt: CreateMaterializedViewStatement<Aug>,
2697) -> Result<Plan, PlanError> {
2698    let cluster_id =
2699        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2700    stmt.in_cluster = Some(ResolvedClusterName {
2701        id: cluster_id,
2702        print_name: None,
2703    });
2704
2705    let create_sql =
2706        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2707
2708    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2709    let name = scx.allocate_qualified_name(partial_name.clone())?;
2710
2711    let query::PlannedRootQuery {
2712        expr,
2713        mut desc,
2714        finishing,
2715        scope: _,
2716    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2717    // We get back a trivial finishing, see comment in `plan_view`.
2718    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2719        &finishing,
2720        expr.arity()
2721    ));
2722    if expr.contains_parameters()? {
2723        return Err(PlanError::ParameterNotAllowed(
2724            "materialized views".to_string(),
2725        ));
2726    }
2727
2728    plan_utils::maybe_rename_columns(
2729        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2730        &mut desc,
2731        &stmt.columns,
2732    )?;
2733    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2734
2735    let MaterializedViewOptionExtracted {
2736        assert_not_null,
2737        partition_by,
2738        retain_history,
2739        refresh,
2740        seen: _,
2741    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2742
2743    if let Some(partition_by) = partition_by {
2744        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2745        check_partition_by(&desc, partition_by)?;
2746    }
2747
2748    let refresh_schedule = {
2749        let mut refresh_schedule = RefreshSchedule::default();
2750        let mut on_commits_seen = 0;
2751        for refresh_option_value in refresh {
2752            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2753                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2754            }
2755            match refresh_option_value {
2756                RefreshOptionValue::OnCommit => {
2757                    on_commits_seen += 1;
2758                }
2759                RefreshOptionValue::AtCreation => {
2760                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2761                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2762                }
2763                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2764                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2765                    let ecx = &ExprContext {
2766                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2767                        name: "REFRESH AT",
2768                        scope: &Scope::empty(),
2769                        relation_type: &RelationType::empty(),
2770                        allow_aggregates: false,
2771                        allow_subqueries: false,
2772                        allow_parameters: false,
2773                        allow_windows: false,
2774                    };
2775                    let hir = plan_expr(ecx, &time)?.cast_to(
2776                        ecx,
2777                        CastContext::Assignment,
2778                        &ScalarType::MzTimestamp,
2779                    )?;
2780                    // (mz_now was purified away to a literal earlier)
2781                    let timestamp = hir
2782                        .into_literal_mz_timestamp()
2783                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2784                    refresh_schedule.ats.push(timestamp);
2785                }
2786                RefreshOptionValue::Every(RefreshEveryOptionValue {
2787                    interval,
2788                    aligned_to,
2789                }) => {
2790                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2791                    if interval.as_microseconds() <= 0 {
2792                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2793                    }
2794                    if interval.months != 0 {
2795                        // This limitation is because we want Intervals to be cleanly convertable
2796                        // to a unix epoch timestamp difference. When the interval involves months, then
2797                        // this is not true anymore, because months have variable lengths.
2798                        // See `Timestamp::round_up`.
2799                        sql_bail!("REFRESH interval must not involve units larger than days");
2800                    }
2801                    let interval = interval.duration()?;
2802                    if u64::try_from(interval.as_millis()).is_err() {
2803                        sql_bail!("REFRESH interval too large");
2804                    }
2805
2806                    let mut aligned_to = match aligned_to {
2807                        Some(aligned_to) => aligned_to,
2808                        None => {
2809                            soft_panic_or_log!(
2810                                "ALIGNED TO should have been filled in by purification"
2811                            );
2812                            sql_bail!(
2813                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2814                            )
2815                        }
2816                    };
2817
2818                    // Desugar the `aligned_to` expression
2819                    transform_ast::transform(scx, &mut aligned_to)?;
2820
2821                    let ecx = &ExprContext {
2822                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2823                        name: "REFRESH EVERY ... ALIGNED TO",
2824                        scope: &Scope::empty(),
2825                        relation_type: &RelationType::empty(),
2826                        allow_aggregates: false,
2827                        allow_subqueries: false,
2828                        allow_parameters: false,
2829                        allow_windows: false,
2830                    };
2831                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2832                        ecx,
2833                        CastContext::Assignment,
2834                        &ScalarType::MzTimestamp,
2835                    )?;
2836                    // (mz_now was purified away to a literal earlier)
2837                    let aligned_to_const = aligned_to_hir
2838                        .into_literal_mz_timestamp()
2839                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2840
2841                    refresh_schedule.everies.push(RefreshEvery {
2842                        interval,
2843                        aligned_to: aligned_to_const,
2844                    });
2845                }
2846            }
2847        }
2848
2849        if on_commits_seen > 1 {
2850            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2851        }
2852        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2853            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2854        }
2855
2856        if refresh_schedule == RefreshSchedule::default() {
2857            None
2858        } else {
2859            Some(refresh_schedule)
2860        }
2861    };
2862
2863    let as_of = stmt.as_of.map(Timestamp::from);
2864    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2865    let mut non_null_assertions = assert_not_null
2866        .into_iter()
2867        .map(normalize::column_name)
2868        .map(|assertion_name| {
2869            column_names
2870                .iter()
2871                .position(|col| col == &assertion_name)
2872                .ok_or_else(|| {
2873                    sql_err!(
2874                        "column {} in ASSERT NOT NULL option not found",
2875                        assertion_name.quoted()
2876                    )
2877                })
2878        })
2879        .collect::<Result<Vec<_>, _>>()?;
2880    non_null_assertions.sort();
2881    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2882        let dup = &column_names[*dup];
2883        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2884    }
2885
2886    if let Some(dup) = column_names.iter().duplicates().next() {
2887        sql_bail!("column {} specified more than once", dup.quoted());
2888    }
2889
2890    // Override the statement-level IfExistsBehavior with Skip if this is
2891    // explicitly requested in the PlanContext (the default is `false`).
2892    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2893        Ok(true) => IfExistsBehavior::Skip,
2894        _ => stmt.if_exists,
2895    };
2896
2897    let mut replace = None;
2898    let mut if_not_exists = false;
2899    match if_exists {
2900        IfExistsBehavior::Replace => {
2901            let if_exists = true;
2902            let cascade = false;
2903            let replace_id = plan_drop_item(
2904                scx,
2905                ObjectType::MaterializedView,
2906                if_exists,
2907                partial_name.into(),
2908                cascade,
2909            )?;
2910
2911            // Check if the new Materialized View depends on the item that we would be replacing.
2912            if let Some(id) = replace_id {
2913                let dependencies = expr.depends_on();
2914                let invalid_drop = scx
2915                    .get_item(&id)
2916                    .global_ids()
2917                    .any(|gid| dependencies.contains(&gid));
2918                if invalid_drop {
2919                    let item = scx.catalog.get_item(&id);
2920                    sql_bail!(
2921                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2922                        scx.catalog.resolve_full_name(item.name())
2923                    );
2924                }
2925                replace = Some(id);
2926            }
2927        }
2928        IfExistsBehavior::Skip => if_not_exists = true,
2929        IfExistsBehavior::Error => (),
2930    }
2931    let drop_ids = replace
2932        .map(|id| {
2933            scx.catalog
2934                .item_dependents(id)
2935                .into_iter()
2936                .map(|id| id.unwrap_item_id())
2937                .collect()
2938        })
2939        .unwrap_or_default();
2940    let dependencies = expr
2941        .depends_on()
2942        .into_iter()
2943        .map(|gid| scx.catalog.resolve_item_id(&gid))
2944        .collect();
2945
2946    // Check for an object in the catalog with this same name
2947    let full_name = scx.catalog.resolve_full_name(&name);
2948    let partial_name = PartialItemName::from(full_name.clone());
2949    // For PostgreSQL compatibility, we need to prevent creating materialized
2950    // views when there is an existing object *or* type of the same name.
2951    if let (IfExistsBehavior::Error, Ok(item)) =
2952        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2953    {
2954        return Err(PlanError::ItemAlreadyExists {
2955            name: full_name.to_string(),
2956            item_type: item.item_type(),
2957        });
2958    }
2959
2960    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2961        name,
2962        materialized_view: MaterializedView {
2963            create_sql,
2964            expr,
2965            dependencies,
2966            column_names,
2967            cluster_id,
2968            non_null_assertions,
2969            compaction_window,
2970            refresh_schedule,
2971            as_of,
2972        },
2973        replace,
2974        drop_ids,
2975        if_not_exists,
2976        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2977    }))
2978}
2979
2980generate_extracted_config!(
2981    MaterializedViewOption,
2982    (AssertNotNull, Ident, AllowMultiple),
2983    (PartitionBy, Vec<Ident>),
2984    (RetainHistory, OptionalDuration),
2985    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2986);
2987
2988pub fn plan_create_continual_task(
2989    scx: &StatementContext,
2990    mut stmt: CreateContinualTaskStatement<Aug>,
2991) -> Result<Plan, PlanError> {
2992    match &stmt.sugar {
2993        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
2994        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
2995            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
2996        }
2997        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
2998            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
2999        }
3000    };
3001    let cluster_id = match &stmt.in_cluster {
3002        None => scx.catalog.resolve_cluster(None)?.id(),
3003        Some(in_cluster) => in_cluster.id,
3004    };
3005    stmt.in_cluster = Some(ResolvedClusterName {
3006        id: cluster_id,
3007        print_name: None,
3008    });
3009
3010    let create_sql =
3011        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3012
3013    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3014
3015    // It seems desirable for a CT that e.g. simply filters the input to keep
3016    // the same nullability. So, start by assuming all columns are non-nullable,
3017    // and then make them nullable below if any of the exprs plan them as
3018    // nullable.
3019    let mut desc = match stmt.columns {
3020        None => None,
3021        Some(columns) => {
3022            let mut desc_columns = Vec::with_capacity(columns.capacity());
3023            for col in columns.iter() {
3024                desc_columns.push((
3025                    normalize::column_name(col.name.clone()),
3026                    ColumnType {
3027                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3028                        nullable: false,
3029                    },
3030                ));
3031            }
3032            Some(RelationDesc::from_names_and_types(desc_columns))
3033        }
3034    };
3035    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3036    match input.item_type() {
3037        // Input must be a thing directly backed by a persist shard, so we can
3038        // use a persist listen to efficiently rehydrate.
3039        CatalogItemType::ContinualTask
3040        | CatalogItemType::Table
3041        | CatalogItemType::MaterializedView
3042        | CatalogItemType::Source => {}
3043        CatalogItemType::Sink
3044        | CatalogItemType::View
3045        | CatalogItemType::Index
3046        | CatalogItemType::Type
3047        | CatalogItemType::Func
3048        | CatalogItemType::Secret
3049        | CatalogItemType::Connection => {
3050            sql_bail!(
3051                "CONTINUAL TASK cannot use {} as an input",
3052                input.item_type()
3053            );
3054        }
3055    }
3056
3057    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3058    let ct_name = stmt.name;
3059    let placeholder_id = match &ct_name {
3060        ResolvedItemName::ContinualTask { id, name } => {
3061            let desc = match desc.as_ref().cloned() {
3062                Some(x) => x,
3063                None => {
3064                    // The user didn't specify the CT's columns. Take a wild
3065                    // guess that the CT has the same shape as the input. It's
3066                    // fine if this is wrong, we'll get an error below after
3067                    // planning the query.
3068                    let input_name = scx.catalog.resolve_full_name(input.name());
3069                    input.desc(&input_name)?.into_owned()
3070                }
3071            };
3072            qcx.ctes.insert(
3073                *id,
3074                CteDesc {
3075                    name: name.item.clone(),
3076                    desc,
3077                },
3078            );
3079            Some(*id)
3080        }
3081        _ => None,
3082    };
3083
3084    let mut exprs = Vec::new();
3085    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3086        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3087        let query::PlannedRootQuery {
3088            expr,
3089            desc: desc_query,
3090            finishing,
3091            scope: _,
3092        } = query::plan_ct_query(&mut qcx, query)?;
3093        // We get back a trivial finishing because we plan with a "maintained"
3094        // QueryLifetime, see comment in `plan_view`.
3095        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3096            &finishing,
3097            expr.arity()
3098        ));
3099        if expr.contains_parameters()? {
3100            if expr.contains_parameters()? {
3101                return Err(PlanError::ParameterNotAllowed(
3102                    "continual tasks".to_string(),
3103                ));
3104            }
3105        }
3106        let expr = match desc.as_mut() {
3107            None => {
3108                desc = Some(desc_query);
3109                expr
3110            }
3111            Some(desc) => {
3112                // We specify the columns for DELETE, so if any columns types don't
3113                // match, it's because it's an INSERT.
3114                if desc_query.arity() > desc.arity() {
3115                    sql_bail!(
3116                        "statement {}: INSERT has more expressions than target columns",
3117                        idx
3118                    );
3119                }
3120                if desc_query.arity() < desc.arity() {
3121                    sql_bail!(
3122                        "statement {}: INSERT has more target columns than expressions",
3123                        idx
3124                    );
3125                }
3126                // Ensure the types of the source query match the types of the target table,
3127                // installing assignment casts where necessary and possible.
3128                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3129                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3130                let expr = expr.map_err(|e| {
3131                    sql_err!(
3132                        "statement {}: column {} is of type {} but expression is of type {}",
3133                        idx,
3134                        desc.get_name(e.column).quoted(),
3135                        qcx.humanize_scalar_type(&e.target_type, false),
3136                        qcx.humanize_scalar_type(&e.source_type, false),
3137                    )
3138                })?;
3139
3140                // Update ct nullability as necessary. The `ne` above verified that the
3141                // types are the same len.
3142                let zip_types = || desc.iter_types().zip(desc_query.iter_types());
3143                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3144                if updated {
3145                    let new_types = zip_types().map(|(ct, q)| {
3146                        let mut ct = ct.clone();
3147                        if q.nullable {
3148                            ct.nullable = true;
3149                        }
3150                        ct
3151                    });
3152                    *desc = RelationDesc::from_names_and_types(
3153                        desc.iter_names().cloned().zip(new_types),
3154                    );
3155                }
3156
3157                expr
3158            }
3159        };
3160        match stmt {
3161            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3162            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3163        }
3164    }
3165    // TODO(ct3): Collect things by output and assert that there is only one (or
3166    // support multiple outputs).
3167    let expr = exprs
3168        .into_iter()
3169        .reduce(|acc, expr| acc.union(expr))
3170        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3171    let dependencies = expr
3172        .depends_on()
3173        .into_iter()
3174        .map(|gid| scx.catalog.resolve_item_id(&gid))
3175        .collect();
3176
3177    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3178    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3179    if let Some(dup) = column_names.iter().duplicates().next() {
3180        sql_bail!("column {} specified more than once", dup.quoted());
3181    }
3182
3183    // Check for an object in the catalog with this same name
3184    let name = match &ct_name {
3185        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3186        ResolvedItemName::ContinualTask { name, .. } => {
3187            let name = scx.allocate_qualified_name(name.clone())?;
3188            let full_name = scx.catalog.resolve_full_name(&name);
3189            let partial_name = PartialItemName::from(full_name.clone());
3190            // For PostgreSQL compatibility, we need to prevent creating this when there
3191            // is an existing object *or* type of the same name.
3192            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3193                return Err(PlanError::ItemAlreadyExists {
3194                    name: full_name.to_string(),
3195                    item_type: item.item_type(),
3196                });
3197            }
3198            name
3199        }
3200        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3201        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3202    };
3203
3204    let as_of = stmt.as_of.map(Timestamp::from);
3205    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3206        name,
3207        placeholder_id,
3208        desc,
3209        input_id: input.global_id(),
3210        with_snapshot: snapshot.unwrap_or(true),
3211        continual_task: MaterializedView {
3212            create_sql,
3213            expr,
3214            dependencies,
3215            column_names,
3216            cluster_id,
3217            non_null_assertions: Vec::new(),
3218            compaction_window: None,
3219            refresh_schedule: None,
3220            as_of,
3221        },
3222    }))
3223}
3224
3225fn continual_task_query<'a>(
3226    ct_name: &ResolvedItemName,
3227    stmt: &'a ast::ContinualTaskStmt<Aug>,
3228) -> Option<ast::Query<Aug>> {
3229    match stmt {
3230        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3231            table_name: _,
3232            columns,
3233            source,
3234            returning,
3235        }) => {
3236            if !columns.is_empty() || !returning.is_empty() {
3237                return None;
3238            }
3239            match source {
3240                ast::InsertSource::Query(query) => Some(query.clone()),
3241                ast::InsertSource::DefaultValues => None,
3242            }
3243        }
3244        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3245            table_name: _,
3246            alias,
3247            using,
3248            selection,
3249        }) => {
3250            if !using.is_empty() {
3251                return None;
3252            }
3253            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3254            let from = ast::TableWithJoins {
3255                relation: ast::TableFactor::Table {
3256                    name: ct_name.clone(),
3257                    alias: alias.clone(),
3258                },
3259                joins: Vec::new(),
3260            };
3261            let select = ast::Select {
3262                from: vec![from],
3263                selection: selection.clone(),
3264                distinct: None,
3265                projection: vec![ast::SelectItem::Wildcard],
3266                group_by: Vec::new(),
3267                having: None,
3268                qualify: None,
3269                options: Vec::new(),
3270            };
3271            let query = ast::Query {
3272                ctes: ast::CteBlock::Simple(Vec::new()),
3273                body: ast::SetExpr::Select(Box::new(select)),
3274                order_by: Vec::new(),
3275                limit: None,
3276                offset: None,
3277            };
3278            // Then negate it to turn it into retractions (after planning it).
3279            Some(query)
3280        }
3281    }
3282}
3283
3284generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3285
3286pub fn describe_create_sink(
3287    _: &StatementContext,
3288    _: CreateSinkStatement<Aug>,
3289) -> Result<StatementDesc, PlanError> {
3290    Ok(StatementDesc::new(None))
3291}
3292
3293generate_extracted_config!(
3294    CreateSinkOption,
3295    (Snapshot, bool),
3296    (PartitionStrategy, String),
3297    (Version, u64)
3298);
3299
3300pub fn plan_create_sink(
3301    scx: &StatementContext,
3302    stmt: CreateSinkStatement<Aug>,
3303) -> Result<Plan, PlanError> {
3304    // Check for an object in the catalog with this same name
3305    let Some(name) = stmt.name.clone() else {
3306        return Err(PlanError::MissingName(CatalogItemType::Sink));
3307    };
3308    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3309    let full_name = scx.catalog.resolve_full_name(&name);
3310    let partial_name = PartialItemName::from(full_name.clone());
3311    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3312        return Err(PlanError::ItemAlreadyExists {
3313            name: full_name.to_string(),
3314            item_type: item.item_type(),
3315        });
3316    }
3317
3318    plan_sink(scx, stmt)
3319}
3320
3321/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3322/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3323/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3324/// important.
3325fn plan_sink(
3326    scx: &StatementContext,
3327    mut stmt: CreateSinkStatement<Aug>,
3328) -> Result<Plan, PlanError> {
3329    let CreateSinkStatement {
3330        name,
3331        in_cluster: _,
3332        from,
3333        connection,
3334        format,
3335        envelope,
3336        if_not_exists,
3337        with_options,
3338    } = stmt.clone();
3339
3340    let Some(name) = name else {
3341        return Err(PlanError::MissingName(CatalogItemType::Sink));
3342    };
3343    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3344
3345    let envelope = match envelope {
3346        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3347        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3348        None => sql_bail!("ENVELOPE clause is required"),
3349    };
3350
3351    let from_name = &from;
3352    let from = scx.get_item_by_resolved_name(&from)?;
3353    if from.id().is_system() {
3354        bail_unsupported!("creating a sink directly on a catalog object");
3355    }
3356    let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3357    let key_indices = match &connection {
3358        CreateSinkConnection::Kafka { key, .. } => {
3359            if let Some(key) = key.clone() {
3360                let key_columns = key
3361                    .key_columns
3362                    .into_iter()
3363                    .map(normalize::column_name)
3364                    .collect::<Vec<_>>();
3365                let mut uniq = BTreeSet::new();
3366                for col in key_columns.iter() {
3367                    if !uniq.insert(col) {
3368                        sql_bail!("duplicate column referenced in KEY: {}", col);
3369                    }
3370                }
3371                let indices = key_columns
3372                    .iter()
3373                    .map(|col| -> anyhow::Result<usize> {
3374                        let name_idx =
3375                            desc.get_by_name(col)
3376                                .map(|(idx, _type)| idx)
3377                                .ok_or_else(|| {
3378                                    sql_err!("column referenced in KEY does not exist: {}", col)
3379                                })?;
3380                        if desc.get_unambiguous_name(name_idx).is_none() {
3381                            sql_err!("column referenced in KEY is ambiguous: {}", col);
3382                        }
3383                        Ok(name_idx)
3384                    })
3385                    .collect::<Result<Vec<_>, _>>()?;
3386                let is_valid_key =
3387                    desc.typ().keys.iter().any(|key_columns| {
3388                        key_columns.iter().all(|column| indices.contains(column))
3389                    });
3390
3391                if !is_valid_key && envelope == SinkEnvelope::Upsert {
3392                    if key.not_enforced {
3393                        scx.catalog
3394                            .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3395                                key: key_columns.clone(),
3396                                name: name.item.clone(),
3397                            })
3398                    } else {
3399                        return Err(PlanError::UpsertSinkWithInvalidKey {
3400                            name: from_name.full_name_str(),
3401                            desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3402                            valid_keys: desc
3403                                .typ()
3404                                .keys
3405                                .iter()
3406                                .map(|key| {
3407                                    key.iter()
3408                                        .map(|col| desc.get_name(*col).as_str().into())
3409                                        .collect()
3410                                })
3411                                .collect(),
3412                        });
3413                    }
3414                }
3415                Some(indices)
3416            } else {
3417                None
3418            }
3419        }
3420    };
3421
3422    let headers_index = match &connection {
3423        CreateSinkConnection::Kafka {
3424            headers: Some(headers),
3425            ..
3426        } => {
3427            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3428
3429            match envelope {
3430                SinkEnvelope::Upsert => (),
3431                SinkEnvelope::Debezium => {
3432                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3433                }
3434            };
3435
3436            let headers = normalize::column_name(headers.clone());
3437            let (idx, ty) = desc
3438                .get_by_name(&headers)
3439                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3440
3441            if desc.get_unambiguous_name(idx).is_none() {
3442                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3443            }
3444
3445            match &ty.scalar_type {
3446                ScalarType::Map { value_type, .. }
3447                    if matches!(&**value_type, ScalarType::String | ScalarType::Bytes) => {}
3448                _ => sql_bail!(
3449                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3450                ),
3451            }
3452
3453            Some(idx)
3454        }
3455        _ => None,
3456    };
3457
3458    // pick the first valid natural relation key, if any
3459    let relation_key_indices = desc.typ().keys.get(0).cloned();
3460
3461    let key_desc_and_indices = key_indices.map(|key_indices| {
3462        let cols = desc
3463            .iter()
3464            .map(|(name, ty)| (name.clone(), ty.clone()))
3465            .collect::<Vec<_>>();
3466        let (names, types): (Vec<_>, Vec<_>) =
3467            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3468        let typ = RelationType::new(types);
3469        (RelationDesc::new(typ, names), key_indices)
3470    });
3471
3472    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3473        return Err(PlanError::UpsertSinkWithoutKey);
3474    }
3475
3476    let connection_builder = match connection {
3477        CreateSinkConnection::Kafka {
3478            connection,
3479            options,
3480            ..
3481        } => kafka_sink_builder(
3482            scx,
3483            connection,
3484            options,
3485            format,
3486            relation_key_indices,
3487            key_desc_and_indices,
3488            headers_index,
3489            desc.into_owned(),
3490            envelope,
3491            from.id(),
3492        )?,
3493    };
3494
3495    let CreateSinkOptionExtracted {
3496        snapshot,
3497        version,
3498        partition_strategy: _,
3499        seen: _,
3500    } = with_options.try_into()?;
3501
3502    // WITH SNAPSHOT defaults to true
3503    let with_snapshot = snapshot.unwrap_or(true);
3504    // VERSION defaults to 0
3505    let version = version.unwrap_or(0);
3506
3507    // We will rewrite the cluster if one is not provided, so we must use the
3508    // `in_cluster` value we plan to normalize when we canonicalize the create
3509    // statement.
3510    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3511    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3512
3513    Ok(Plan::CreateSink(CreateSinkPlan {
3514        name,
3515        sink: Sink {
3516            create_sql,
3517            from: from.global_id(),
3518            connection: connection_builder,
3519            envelope,
3520            version,
3521        },
3522        with_snapshot,
3523        if_not_exists,
3524        in_cluster: in_cluster.id(),
3525    }))
3526}
3527
3528fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3529    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3530
3531    let existing_keys = desc
3532        .typ()
3533        .keys
3534        .iter()
3535        .map(|key_columns| {
3536            key_columns
3537                .iter()
3538                .map(|col| desc.get_name(*col).as_str())
3539                .join(", ")
3540        })
3541        .join(", ");
3542
3543    sql_err!(
3544        "Key constraint ({}) conflicts with existing key ({})",
3545        user_keys,
3546        existing_keys
3547    )
3548}
3549
3550/// Creating this by hand instead of using generate_extracted_config! macro
3551/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3552#[derive(Debug, Default, PartialEq, Clone)]
3553pub struct CsrConfigOptionExtracted {
3554    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3555    pub(crate) avro_key_fullname: Option<String>,
3556    pub(crate) avro_value_fullname: Option<String>,
3557    pub(crate) null_defaults: bool,
3558    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3559    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3560    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3561    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3562}
3563
3564impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3565    type Error = crate::plan::PlanError;
3566    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3567        let mut extracted = CsrConfigOptionExtracted::default();
3568        let mut common_doc_comments = BTreeMap::new();
3569        for option in v {
3570            if !extracted.seen.insert(option.name.clone()) {
3571                return Err(PlanError::Unstructured({
3572                    format!("{} specified more than once", option.name)
3573                }));
3574            }
3575            let option_name = option.name.clone();
3576            let option_name_str = option_name.to_ast_string_simple();
3577            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3578                option_name: option_name.to_ast_string_simple(),
3579                err: e.into(),
3580            };
3581            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3582                val.map(|s| match s {
3583                    WithOptionValue::Value(Value::String(s)) => {
3584                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3585                    }
3586                    _ => Err("must be a string".to_string()),
3587                })
3588                .transpose()
3589                .map_err(PlanError::Unstructured)
3590                .map_err(better_error)
3591            };
3592            match option.name {
3593                CsrConfigOptionName::AvroKeyFullname => {
3594                    extracted.avro_key_fullname =
3595                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3596                }
3597                CsrConfigOptionName::AvroValueFullname => {
3598                    extracted.avro_value_fullname =
3599                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3600                }
3601                CsrConfigOptionName::NullDefaults => {
3602                    extracted.null_defaults =
3603                        <bool>::try_from_value(option.value).map_err(better_error)?;
3604                }
3605                CsrConfigOptionName::AvroDocOn(doc_on) => {
3606                    let value = String::try_from_value(option.value.ok_or_else(|| {
3607                        PlanError::InvalidOptionValue {
3608                            option_name: option_name_str,
3609                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3610                        }
3611                    })?)
3612                    .map_err(better_error)?;
3613                    let key = match doc_on.identifier {
3614                        DocOnIdentifier::Column(ast::ColumnName {
3615                            relation: ResolvedItemName::Item { id, .. },
3616                            column: ResolvedColumnReference::Column { name, index: _ },
3617                        }) => DocTarget::Field {
3618                            object_id: id,
3619                            column_name: name,
3620                        },
3621                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3622                            DocTarget::Type(id)
3623                        }
3624                        _ => unreachable!(),
3625                    };
3626
3627                    match doc_on.for_schema {
3628                        DocOnSchema::KeyOnly => {
3629                            extracted.key_doc_options.insert(key, value);
3630                        }
3631                        DocOnSchema::ValueOnly => {
3632                            extracted.value_doc_options.insert(key, value);
3633                        }
3634                        DocOnSchema::All => {
3635                            common_doc_comments.insert(key, value);
3636                        }
3637                    }
3638                }
3639                CsrConfigOptionName::KeyCompatibilityLevel => {
3640                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3641                }
3642                CsrConfigOptionName::ValueCompatibilityLevel => {
3643                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3644                }
3645            }
3646        }
3647
3648        for (key, value) in common_doc_comments {
3649            if !extracted.key_doc_options.contains_key(&key) {
3650                extracted.key_doc_options.insert(key.clone(), value.clone());
3651            }
3652            if !extracted.value_doc_options.contains_key(&key) {
3653                extracted.value_doc_options.insert(key, value);
3654            }
3655        }
3656        Ok(extracted)
3657    }
3658}
3659
3660fn kafka_sink_builder(
3661    scx: &StatementContext,
3662    connection: ResolvedItemName,
3663    options: Vec<KafkaSinkConfigOption<Aug>>,
3664    format: Option<FormatSpecifier<Aug>>,
3665    relation_key_indices: Option<Vec<usize>>,
3666    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3667    headers_index: Option<usize>,
3668    value_desc: RelationDesc,
3669    envelope: SinkEnvelope,
3670    sink_from: CatalogItemId,
3671) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3672    // Get Kafka connection.
3673    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3674    let connection_id = connection_item.id();
3675    match connection_item.connection()? {
3676        Connection::Kafka(_) => (),
3677        _ => sql_bail!(
3678            "{} is not a kafka connection",
3679            scx.catalog.resolve_full_name(connection_item.name())
3680        ),
3681    };
3682
3683    let KafkaSinkConfigOptionExtracted {
3684        topic,
3685        compression_type,
3686        partition_by,
3687        progress_group_id_prefix,
3688        transactional_id_prefix,
3689        legacy_ids,
3690        topic_config,
3691        topic_metadata_refresh_interval,
3692        topic_partition_count,
3693        topic_replication_factor,
3694        seen: _,
3695    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3696
3697    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3698        (Some(_), Some(true)) => {
3699            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3700        }
3701        (None, Some(true)) => KafkaIdStyle::Legacy,
3702        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3703    };
3704
3705    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3706        (Some(_), Some(true)) => {
3707            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3708        }
3709        (None, Some(true)) => KafkaIdStyle::Legacy,
3710        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3711    };
3712
3713    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3714
3715    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3716        // This is a librdkafka-enforced restriction that, if violated,
3717        // would result in a runtime error for the source.
3718        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3719    }
3720
3721    let assert_positive = |val: Option<i32>, name: &str| {
3722        if let Some(val) = val {
3723            if val <= 0 {
3724                sql_bail!("{} must be a positive integer", name);
3725            }
3726        }
3727        val.map(NonNeg::try_from)
3728            .transpose()
3729            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3730    };
3731    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3732    let topic_replication_factor =
3733        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3734
3735    // Helper method to parse avro connection options for format specifiers that use avro
3736    // for either key or value encoding.
3737    let gen_avro_schema_options = |conn| {
3738        let CsrConnectionAvro {
3739            connection:
3740                CsrConnection {
3741                    connection,
3742                    options,
3743                },
3744            seed,
3745            key_strategy,
3746            value_strategy,
3747        } = conn;
3748        if seed.is_some() {
3749            sql_bail!("SEED option does not make sense with sinks");
3750        }
3751        if key_strategy.is_some() {
3752            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3753        }
3754        if value_strategy.is_some() {
3755            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3756        }
3757
3758        let item = scx.get_item_by_resolved_name(&connection)?;
3759        let csr_connection = match item.connection()? {
3760            Connection::Csr(_) => item.id(),
3761            _ => {
3762                sql_bail!(
3763                    "{} is not a schema registry connection",
3764                    scx.catalog
3765                        .resolve_full_name(item.name())
3766                        .to_string()
3767                        .quoted()
3768                )
3769            }
3770        };
3771        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3772
3773        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3774            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3775        }
3776
3777        if key_desc_and_indices.is_some()
3778            && (extracted_options.avro_key_fullname.is_some()
3779                ^ extracted_options.avro_value_fullname.is_some())
3780        {
3781            sql_bail!(
3782                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3783            );
3784        }
3785
3786        Ok((csr_connection, extracted_options))
3787    };
3788
3789    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3790        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3791        Format::Bytes if desc.arity() == 1 => {
3792            let col_type = &desc.typ().column_types[0].scalar_type;
3793            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3794                bail_unsupported!(format!(
3795                    "BYTES format with non-encodable type: {:?}",
3796                    col_type
3797                ));
3798            }
3799
3800            Ok(KafkaSinkFormatType::Bytes)
3801        }
3802        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3803        Format::Bytes | Format::Text => {
3804            bail_unsupported!("BYTES or TEXT format with multiple columns")
3805        }
3806        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3807        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3808            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3809            let schema = if is_key {
3810                AvroSchemaGenerator::new(
3811                    desc.clone(),
3812                    false,
3813                    options.key_doc_options,
3814                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3815                    options.null_defaults,
3816                    Some(sink_from),
3817                    false,
3818                )?
3819                .schema()
3820                .to_string()
3821            } else {
3822                AvroSchemaGenerator::new(
3823                    desc.clone(),
3824                    matches!(envelope, SinkEnvelope::Debezium),
3825                    options.value_doc_options,
3826                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3827                    options.null_defaults,
3828                    Some(sink_from),
3829                    true,
3830                )?
3831                .schema()
3832                .to_string()
3833            };
3834            Ok(KafkaSinkFormatType::Avro {
3835                schema,
3836                compatibility_level: if is_key {
3837                    options.key_compatibility_level
3838                } else {
3839                    options.value_compatibility_level
3840                },
3841                csr_connection,
3842            })
3843        }
3844        format => bail_unsupported!(format!("sink format {:?}", format)),
3845    };
3846
3847    let partition_by = match &partition_by {
3848        Some(partition_by) => {
3849            let mut scope = Scope::from_source(None, value_desc.iter_names());
3850
3851            match envelope {
3852                SinkEnvelope::Upsert => (),
3853                SinkEnvelope::Debezium => {
3854                    let key_indices: HashSet<_> = key_desc_and_indices
3855                        .as_ref()
3856                        .map(|(_desc, indices)| indices.as_slice())
3857                        .unwrap_or_default()
3858                        .into_iter()
3859                        .collect();
3860                    for (i, item) in scope.items.iter_mut().enumerate() {
3861                        if !key_indices.contains(&i) {
3862                            item.error_if_referenced = Some(|_table, column| {
3863                                PlanError::InvalidPartitionByEnvelopeDebezium {
3864                                    column_name: column.to_string(),
3865                                }
3866                            });
3867                        }
3868                    }
3869                }
3870            };
3871
3872            let ecx = &ExprContext {
3873                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3874                name: "PARTITION BY",
3875                scope: &scope,
3876                relation_type: value_desc.typ(),
3877                allow_aggregates: false,
3878                allow_subqueries: false,
3879                allow_parameters: false,
3880                allow_windows: false,
3881            };
3882            let expr = plan_expr(ecx, partition_by)?.cast_to(
3883                ecx,
3884                CastContext::Assignment,
3885                &ScalarType::UInt64,
3886            )?;
3887            let expr = expr.lower_uncorrelated()?;
3888
3889            Some(expr)
3890        }
3891        _ => None,
3892    };
3893
3894    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3895    let format = match format {
3896        Some(FormatSpecifier::KeyValue { key, value }) => {
3897            let key_format = match key_desc_and_indices.as_ref() {
3898                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3899                None => None,
3900            };
3901            KafkaSinkFormat {
3902                value_format: map_format(value, &value_desc, false)?,
3903                key_format,
3904            }
3905        }
3906        Some(FormatSpecifier::Bare(format)) => {
3907            let key_format = match key_desc_and_indices.as_ref() {
3908                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3909                None => None,
3910            };
3911            KafkaSinkFormat {
3912                value_format: map_format(format, &value_desc, false)?,
3913                key_format,
3914            }
3915        }
3916        None => bail_unsupported!("sink without format"),
3917    };
3918
3919    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3920        connection_id,
3921        connection: connection_id,
3922        format,
3923        topic: topic_name,
3924        relation_key_indices,
3925        key_desc_and_indices,
3926        headers_index,
3927        value_desc,
3928        partition_by,
3929        compression_type,
3930        progress_group_id,
3931        transactional_id,
3932        topic_options: KafkaTopicOptions {
3933            partition_count: topic_partition_count,
3934            replication_factor: topic_replication_factor,
3935            topic_config: topic_config.unwrap_or_default(),
3936        },
3937        topic_metadata_refresh_interval,
3938    }))
3939}
3940
3941pub fn describe_create_index(
3942    _: &StatementContext,
3943    _: CreateIndexStatement<Aug>,
3944) -> Result<StatementDesc, PlanError> {
3945    Ok(StatementDesc::new(None))
3946}
3947
3948pub fn plan_create_index(
3949    scx: &StatementContext,
3950    mut stmt: CreateIndexStatement<Aug>,
3951) -> Result<Plan, PlanError> {
3952    let CreateIndexStatement {
3953        name,
3954        on_name,
3955        in_cluster,
3956        key_parts,
3957        with_options,
3958        if_not_exists,
3959    } = &mut stmt;
3960    let on = scx.get_item_by_resolved_name(on_name)?;
3961    let on_item_type = on.item_type();
3962
3963    if !matches!(
3964        on_item_type,
3965        CatalogItemType::View
3966            | CatalogItemType::MaterializedView
3967            | CatalogItemType::Source
3968            | CatalogItemType::Table
3969    ) {
3970        sql_bail!(
3971            "index cannot be created on {} because it is a {}",
3972            on_name.full_name_str(),
3973            on.item_type()
3974        )
3975    }
3976
3977    let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3978
3979    let filled_key_parts = match key_parts {
3980        Some(kp) => kp.to_vec(),
3981        None => {
3982            // `key_parts` is None if we're creating a "default" index.
3983            on.desc(&scx.catalog.resolve_full_name(on.name()))?
3984                .typ()
3985                .default_key()
3986                .iter()
3987                .map(|i| match on_desc.get_unambiguous_name(*i) {
3988                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
3989                    _ => Expr::Value(Value::Number((i + 1).to_string())),
3990                })
3991                .collect()
3992        }
3993    };
3994    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
3995
3996    let index_name = if let Some(name) = name {
3997        QualifiedItemName {
3998            qualifiers: on.name().qualifiers.clone(),
3999            item: normalize::ident(name.clone()),
4000        }
4001    } else {
4002        let mut idx_name = QualifiedItemName {
4003            qualifiers: on.name().qualifiers.clone(),
4004            item: on.name().item.clone(),
4005        };
4006        if key_parts.is_none() {
4007            // We're trying to create the "default" index.
4008            idx_name.item += "_primary_idx";
4009        } else {
4010            // Use PG schema for automatically naming indexes:
4011            // `<table>_<_-separated indexed expressions>_idx`
4012            let index_name_col_suffix = keys
4013                .iter()
4014                .map(|k| match k {
4015                    mz_expr::MirScalarExpr::Column(i, name) => {
4016                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4017                            (Some(col_name), _) => col_name.to_string(),
4018                            (None, Some(name)) => name.to_string(),
4019                            (None, None) => format!("{}", i + 1),
4020                        }
4021                    }
4022                    _ => "expr".to_string(),
4023                })
4024                .join("_");
4025            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4026                .expect("write on strings cannot fail");
4027            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4028        }
4029
4030        if !*if_not_exists {
4031            scx.catalog.find_available_name(idx_name)
4032        } else {
4033            idx_name
4034        }
4035    };
4036
4037    // Check for an object in the catalog with this same name
4038    let full_name = scx.catalog.resolve_full_name(&index_name);
4039    let partial_name = PartialItemName::from(full_name.clone());
4040    // For PostgreSQL compatibility, we need to prevent creating indexes when
4041    // there is an existing object *or* type of the same name.
4042    //
4043    // Technically, we only need to prevent coexistence of indexes and types
4044    // that have an associated relation (record types but not list/map types).
4045    // Enforcing that would be more complicated, though. It's backwards
4046    // compatible to weaken this restriction in the future.
4047    if let (Ok(item), false, false) = (
4048        scx.catalog.resolve_item_or_type(&partial_name),
4049        *if_not_exists,
4050        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4051    ) {
4052        return Err(PlanError::ItemAlreadyExists {
4053            name: full_name.to_string(),
4054            item_type: item.item_type(),
4055        });
4056    }
4057
4058    let options = plan_index_options(scx, with_options.clone())?;
4059    let cluster_id = match in_cluster {
4060        None => scx.resolve_cluster(None)?.id(),
4061        Some(in_cluster) => in_cluster.id,
4062    };
4063
4064    *in_cluster = Some(ResolvedClusterName {
4065        id: cluster_id,
4066        print_name: None,
4067    });
4068
4069    // Normalize `stmt`.
4070    *name = Some(Ident::new(index_name.item.clone())?);
4071    *key_parts = Some(filled_key_parts);
4072    let if_not_exists = *if_not_exists;
4073
4074    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4075    let compaction_window = options.iter().find_map(|o| {
4076        #[allow(irrefutable_let_patterns)]
4077        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4078            Some(lcw.clone())
4079        } else {
4080            None
4081        }
4082    });
4083
4084    Ok(Plan::CreateIndex(CreateIndexPlan {
4085        name: index_name,
4086        index: Index {
4087            create_sql,
4088            on: on.global_id(),
4089            keys,
4090            cluster_id,
4091            compaction_window,
4092        },
4093        if_not_exists,
4094    }))
4095}
4096
4097pub fn describe_create_type(
4098    _: &StatementContext,
4099    _: CreateTypeStatement<Aug>,
4100) -> Result<StatementDesc, PlanError> {
4101    Ok(StatementDesc::new(None))
4102}
4103
4104pub fn plan_create_type(
4105    scx: &StatementContext,
4106    stmt: CreateTypeStatement<Aug>,
4107) -> Result<Plan, PlanError> {
4108    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4109    let CreateTypeStatement { name, as_type, .. } = stmt;
4110
4111    fn validate_data_type(
4112        scx: &StatementContext,
4113        data_type: ResolvedDataType,
4114        as_type: &str,
4115        key: &str,
4116    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4117        let (id, modifiers) = match data_type {
4118            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4119            _ => sql_bail!(
4120                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4121                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4122                as_type,
4123                key,
4124                data_type.human_readable_name(),
4125            ),
4126        };
4127
4128        let item = scx.catalog.get_item(&id);
4129        match item.type_details() {
4130            None => sql_bail!(
4131                "{} must be of class type, but received {} which is of class {}",
4132                key,
4133                scx.catalog.resolve_full_name(item.name()),
4134                item.item_type()
4135            ),
4136            Some(CatalogTypeDetails {
4137                typ: CatalogType::Char,
4138                ..
4139            }) => {
4140                bail_unsupported!("embedding char type in a list or map")
4141            }
4142            _ => {
4143                // Validate that the modifiers are actually valid.
4144                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4145
4146                Ok((id, modifiers))
4147            }
4148        }
4149    }
4150
4151    let inner = match as_type {
4152        CreateTypeAs::List { options } => {
4153            let CreateTypeListOptionExtracted {
4154                element_type,
4155                seen: _,
4156            } = CreateTypeListOptionExtracted::try_from(options)?;
4157            let element_type =
4158                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4159            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4160            CatalogType::List {
4161                element_reference: id,
4162                element_modifiers: modifiers,
4163            }
4164        }
4165        CreateTypeAs::Map { options } => {
4166            let CreateTypeMapOptionExtracted {
4167                key_type,
4168                value_type,
4169                seen: _,
4170            } = CreateTypeMapOptionExtracted::try_from(options)?;
4171            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4172            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4173            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4174            let (value_id, value_modifiers) =
4175                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4176            CatalogType::Map {
4177                key_reference: key_id,
4178                key_modifiers,
4179                value_reference: value_id,
4180                value_modifiers,
4181            }
4182        }
4183        CreateTypeAs::Record { column_defs } => {
4184            let mut fields = vec![];
4185            for column_def in column_defs {
4186                let data_type = column_def.data_type;
4187                let key = ident(column_def.name.clone());
4188                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4189                fields.push(CatalogRecordField {
4190                    name: ColumnName::from(key.clone()),
4191                    type_reference: id,
4192                    type_modifiers: modifiers,
4193                });
4194            }
4195            CatalogType::Record { fields }
4196        }
4197    };
4198
4199    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4200
4201    // Check for an object in the catalog with this same name
4202    let full_name = scx.catalog.resolve_full_name(&name);
4203    let partial_name = PartialItemName::from(full_name.clone());
4204    // For PostgreSQL compatibility, we need to prevent creating types when
4205    // there is an existing object *or* type of the same name.
4206    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4207        if item.item_type().conflicts_with_type() {
4208            return Err(PlanError::ItemAlreadyExists {
4209                name: full_name.to_string(),
4210                item_type: item.item_type(),
4211            });
4212        }
4213    }
4214
4215    Ok(Plan::CreateType(CreateTypePlan {
4216        name,
4217        typ: Type { create_sql, inner },
4218    }))
4219}
4220
4221generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4222
4223generate_extracted_config!(
4224    CreateTypeMapOption,
4225    (KeyType, ResolvedDataType),
4226    (ValueType, ResolvedDataType)
4227);
4228
4229#[derive(Debug)]
4230pub enum PlannedAlterRoleOption {
4231    Attributes(PlannedRoleAttributes),
4232    Variable(PlannedRoleVariable),
4233}
4234
4235#[derive(Debug, Clone)]
4236pub struct PlannedRoleAttributes {
4237    pub inherit: Option<bool>,
4238    pub password: Option<Password>,
4239    /// `nopassword` is set to true if the password is from the parser is None.
4240    /// This is semantically different than not supplying a password at all,
4241    /// to allow for unsetting a password.
4242    pub nopassword: Option<bool>,
4243    pub superuser: Option<bool>,
4244    pub login: Option<bool>,
4245}
4246
4247fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4248    let mut planned_attributes = PlannedRoleAttributes {
4249        inherit: None,
4250        password: None,
4251        superuser: None,
4252        login: None,
4253        nopassword: None,
4254    };
4255
4256    for option in options {
4257        match option {
4258            RoleAttribute::Inherit | RoleAttribute::NoInherit
4259                if planned_attributes.inherit.is_some() =>
4260            {
4261                sql_bail!("conflicting or redundant options");
4262            }
4263            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4264                bail_never_supported!(
4265                    "CREATECLUSTER attribute",
4266                    "sql/create-role/#details",
4267                    "Use system privileges instead."
4268                );
4269            }
4270            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4271                bail_never_supported!(
4272                    "CREATEDB attribute",
4273                    "sql/create-role/#details",
4274                    "Use system privileges instead."
4275                );
4276            }
4277            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4278                bail_never_supported!(
4279                    "CREATEROLE attribute",
4280                    "sql/create-role/#details",
4281                    "Use system privileges instead."
4282                );
4283            }
4284            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4285                sql_bail!("conflicting or redundant options");
4286            }
4287
4288            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4289            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4290            RoleAttribute::Password(password) => {
4291                if let Some(password) = password {
4292                    planned_attributes.password = Some(password.into());
4293                } else {
4294                    planned_attributes.nopassword = Some(true);
4295                }
4296            }
4297            RoleAttribute::SuperUser => {
4298                if planned_attributes.superuser == Some(false) {
4299                    sql_bail!("conflicting or redundant options");
4300                }
4301                planned_attributes.superuser = Some(true);
4302            }
4303            RoleAttribute::NoSuperUser => {
4304                if planned_attributes.superuser == Some(true) {
4305                    sql_bail!("conflicting or redundant options");
4306                }
4307                planned_attributes.superuser = Some(false);
4308            }
4309            RoleAttribute::Login => {
4310                if planned_attributes.login == Some(false) {
4311                    sql_bail!("conflicting or redundant options");
4312                }
4313                planned_attributes.login = Some(true);
4314            }
4315            RoleAttribute::NoLogin => {
4316                if planned_attributes.login == Some(true) {
4317                    sql_bail!("conflicting or redundant options");
4318                }
4319                planned_attributes.login = Some(false);
4320            }
4321        }
4322    }
4323    if planned_attributes.inherit == Some(false) {
4324        bail_unsupported!("non inherit roles");
4325    }
4326
4327    Ok(planned_attributes)
4328}
4329
4330#[derive(Debug)]
4331pub enum PlannedRoleVariable {
4332    Set { name: String, value: VariableValue },
4333    Reset { name: String },
4334}
4335
4336impl PlannedRoleVariable {
4337    pub fn name(&self) -> &str {
4338        match self {
4339            PlannedRoleVariable::Set { name, .. } => name,
4340            PlannedRoleVariable::Reset { name } => name,
4341        }
4342    }
4343}
4344
4345fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4346    let plan = match variable {
4347        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4348            name: name.to_string(),
4349            value: scl::plan_set_variable_to(value)?,
4350        },
4351        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4352            name: name.to_string(),
4353        },
4354    };
4355    Ok(plan)
4356}
4357
4358pub fn describe_create_role(
4359    _: &StatementContext,
4360    _: CreateRoleStatement,
4361) -> Result<StatementDesc, PlanError> {
4362    Ok(StatementDesc::new(None))
4363}
4364
4365pub fn plan_create_role(
4366    _: &StatementContext,
4367    CreateRoleStatement { name, options }: CreateRoleStatement,
4368) -> Result<Plan, PlanError> {
4369    let attributes = plan_role_attributes(options)?;
4370    Ok(Plan::CreateRole(CreateRolePlan {
4371        name: normalize::ident(name),
4372        attributes: attributes.into(),
4373    }))
4374}
4375
4376pub fn plan_create_network_policy(
4377    ctx: &StatementContext,
4378    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4379) -> Result<Plan, PlanError> {
4380    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4381    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4382
4383    let Some(rule_defs) = policy_options.rules else {
4384        sql_bail!("RULES must be specified when creating network policies.");
4385    };
4386
4387    let mut rules = vec![];
4388    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4389        let NetworkPolicyRuleOptionExtracted {
4390            seen: _,
4391            direction,
4392            action,
4393            address,
4394        } = options.try_into()?;
4395        let (direction, action, address) = match (direction, action, address) {
4396            (Some(direction), Some(action), Some(address)) => (
4397                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4398                NetworkPolicyRuleAction::try_from(action.as_str())?,
4399                PolicyAddress::try_from(address.as_str())?,
4400            ),
4401            (_, _, _) => {
4402                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4403            }
4404        };
4405        rules.push(NetworkPolicyRule {
4406            name: normalize::ident(name),
4407            direction,
4408            action,
4409            address,
4410        });
4411    }
4412
4413    if rules.len()
4414        > ctx
4415            .catalog
4416            .system_vars()
4417            .max_rules_per_network_policy()
4418            .try_into()?
4419    {
4420        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4421    }
4422
4423    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4424        name: normalize::ident(name),
4425        rules,
4426    }))
4427}
4428
4429pub fn plan_alter_network_policy(
4430    ctx: &StatementContext,
4431    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4432) -> Result<Plan, PlanError> {
4433    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4434
4435    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4436    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4437
4438    let Some(rule_defs) = policy_options.rules else {
4439        sql_bail!("RULES must be specified when creating network policies.");
4440    };
4441
4442    let mut rules = vec![];
4443    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4444        let NetworkPolicyRuleOptionExtracted {
4445            seen: _,
4446            direction,
4447            action,
4448            address,
4449        } = options.try_into()?;
4450
4451        let (direction, action, address) = match (direction, action, address) {
4452            (Some(direction), Some(action), Some(address)) => (
4453                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4454                NetworkPolicyRuleAction::try_from(action.as_str())?,
4455                PolicyAddress::try_from(address.as_str())?,
4456            ),
4457            (_, _, _) => {
4458                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4459            }
4460        };
4461        rules.push(NetworkPolicyRule {
4462            name: normalize::ident(name),
4463            direction,
4464            action,
4465            address,
4466        });
4467    }
4468    if rules.len()
4469        > ctx
4470            .catalog
4471            .system_vars()
4472            .max_rules_per_network_policy()
4473            .try_into()?
4474    {
4475        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4476    }
4477
4478    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4479        id: policy.id(),
4480        name: normalize::ident(name),
4481        rules,
4482    }))
4483}
4484
4485pub fn describe_create_cluster(
4486    _: &StatementContext,
4487    _: CreateClusterStatement<Aug>,
4488) -> Result<StatementDesc, PlanError> {
4489    Ok(StatementDesc::new(None))
4490}
4491
4492// WARNING:
4493// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4494// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4495// that option stays the same. If you were to give a default value here, then not giving that option
4496// to ALTER CLUSTER would always reset the value of that option to the default.
4497generate_extracted_config!(
4498    ClusterOption,
4499    (AvailabilityZones, Vec<String>),
4500    (Disk, bool),
4501    (IntrospectionDebugging, bool),
4502    (IntrospectionInterval, OptionalDuration),
4503    (Managed, bool),
4504    (Replicas, Vec<ReplicaDefinition<Aug>>),
4505    (ReplicationFactor, u32),
4506    (Size, String),
4507    (Schedule, ClusterScheduleOptionValue),
4508    (WorkloadClass, OptionalString)
4509);
4510
4511generate_extracted_config!(
4512    NetworkPolicyOption,
4513    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4514);
4515
4516generate_extracted_config!(
4517    NetworkPolicyRuleOption,
4518    (Direction, String),
4519    (Action, String),
4520    (Address, String)
4521);
4522
4523generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4524
4525generate_extracted_config!(
4526    ClusterAlterUntilReadyOption,
4527    (Timeout, Duration),
4528    (OnTimeout, String)
4529);
4530
4531generate_extracted_config!(
4532    ClusterFeature,
4533    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4534    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4535    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4536    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4537    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4538    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4539    (
4540        EnableProjectionPushdownAfterRelationCse,
4541        Option<bool>,
4542        Default(None)
4543    )
4544);
4545
4546/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4547///
4548/// The reverse of [`unplan_create_cluster`].
4549pub fn plan_create_cluster(
4550    scx: &StatementContext,
4551    stmt: CreateClusterStatement<Aug>,
4552) -> Result<Plan, PlanError> {
4553    let plan = plan_create_cluster_inner(scx, stmt)?;
4554
4555    // Roundtrip through unplan and make sure that we end up with the same plan.
4556    if let CreateClusterVariant::Managed(_) = &plan.variant {
4557        let stmt = unplan_create_cluster(scx, plan.clone())
4558            .map_err(|e| PlanError::Replan(e.to_string()))?;
4559        let create_sql = stmt.to_ast_string_stable();
4560        let stmt = parse::parse(&create_sql)
4561            .map_err(|e| PlanError::Replan(e.to_string()))?
4562            .into_element()
4563            .ast;
4564        let (stmt, _resolved_ids) =
4565            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4566        let stmt = match stmt {
4567            Statement::CreateCluster(stmt) => stmt,
4568            stmt => {
4569                return Err(PlanError::Replan(format!(
4570                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4571                )));
4572            }
4573        };
4574        let replan =
4575            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4576        if plan != replan {
4577            return Err(PlanError::Replan(format!(
4578                "replan does not match: plan={plan:?}, replan={replan:?}"
4579            )));
4580        }
4581    }
4582
4583    Ok(Plan::CreateCluster(plan))
4584}
4585
4586pub fn plan_create_cluster_inner(
4587    scx: &StatementContext,
4588    CreateClusterStatement {
4589        name,
4590        options,
4591        features,
4592    }: CreateClusterStatement<Aug>,
4593) -> Result<CreateClusterPlan, PlanError> {
4594    let ClusterOptionExtracted {
4595        availability_zones,
4596        introspection_debugging,
4597        introspection_interval,
4598        managed,
4599        replicas,
4600        replication_factor,
4601        seen: _,
4602        size,
4603        disk,
4604        schedule,
4605        workload_class,
4606    }: ClusterOptionExtracted = options.try_into()?;
4607
4608    let managed = managed.unwrap_or_else(|| replicas.is_none());
4609
4610    if !scx.catalog.active_role_id().is_system() {
4611        if !features.is_empty() {
4612            sql_bail!("FEATURES not supported for non-system users");
4613        }
4614        if workload_class.is_some() {
4615            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4616        }
4617    }
4618
4619    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4620    let workload_class = workload_class.and_then(|v| v.0);
4621
4622    if managed {
4623        if replicas.is_some() {
4624            sql_bail!("REPLICAS not supported for managed clusters");
4625        }
4626        let Some(size) = size else {
4627            sql_bail!("SIZE must be specified for managed clusters");
4628        };
4629
4630        if disk.is_some() {
4631            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4632            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4633            // we'll be able to remove the `DISK` option entirely.
4634            if scx.catalog.is_cluster_size_cc(&size) {
4635                sql_bail!(
4636                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4637                );
4638            }
4639
4640            scx.catalog
4641                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4642        }
4643
4644        let compute = plan_compute_replica_config(
4645            introspection_interval,
4646            introspection_debugging.unwrap_or(false),
4647        )?;
4648
4649        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4650            replication_factor.unwrap_or_else(|| {
4651                scx.catalog
4652                    .system_vars()
4653                    .default_cluster_replication_factor()
4654            })
4655        } else {
4656            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4657            if replication_factor.is_some() {
4658                sql_bail!(
4659                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4660                );
4661            }
4662            // If we have a non-trivial schedule, then let's not have any replicas initially,
4663            // to avoid quickly going back and forth if the schedule doesn't want a replica
4664            // initially.
4665            0
4666        };
4667        let availability_zones = availability_zones.unwrap_or_default();
4668
4669        if !availability_zones.is_empty() {
4670            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4671        }
4672
4673        // Plan OptimizerFeatureOverrides.
4674        let ClusterFeatureExtracted {
4675            reoptimize_imported_views,
4676            enable_eager_delta_joins,
4677            enable_new_outer_join_lowering,
4678            enable_variadic_left_join_lowering,
4679            enable_letrec_fixpoint_analysis,
4680            enable_join_prioritize_arranged,
4681            enable_projection_pushdown_after_relation_cse,
4682            seen: _,
4683        } = ClusterFeatureExtracted::try_from(features)?;
4684        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4685            reoptimize_imported_views,
4686            enable_eager_delta_joins,
4687            enable_new_outer_join_lowering,
4688            enable_variadic_left_join_lowering,
4689            enable_letrec_fixpoint_analysis,
4690            enable_join_prioritize_arranged,
4691            enable_projection_pushdown_after_relation_cse,
4692            ..Default::default()
4693        };
4694
4695        let schedule = plan_cluster_schedule(schedule)?;
4696
4697        Ok(CreateClusterPlan {
4698            name: normalize::ident(name),
4699            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4700                replication_factor,
4701                size,
4702                availability_zones,
4703                compute,
4704                optimizer_feature_overrides,
4705                schedule,
4706            }),
4707            workload_class,
4708        })
4709    } else {
4710        let Some(replica_defs) = replicas else {
4711            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4712        };
4713        if availability_zones.is_some() {
4714            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4715        }
4716        if replication_factor.is_some() {
4717            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4718        }
4719        if introspection_debugging.is_some() {
4720            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4721        }
4722        if introspection_interval.is_some() {
4723            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4724        }
4725        if size.is_some() {
4726            sql_bail!("SIZE not supported for unmanaged clusters");
4727        }
4728        if disk.is_some() {
4729            sql_bail!("DISK not supported for unmanaged clusters");
4730        }
4731        if !features.is_empty() {
4732            sql_bail!("FEATURES not supported for unmanaged clusters");
4733        }
4734        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4735            sql_bail!(
4736                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4737            );
4738        }
4739
4740        let mut replicas = vec![];
4741        for ReplicaDefinition { name, options } in replica_defs {
4742            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4743        }
4744
4745        Ok(CreateClusterPlan {
4746            name: normalize::ident(name),
4747            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4748            workload_class,
4749        })
4750    }
4751}
4752
4753/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4754///
4755/// The reverse of [`plan_create_cluster`].
4756pub fn unplan_create_cluster(
4757    scx: &StatementContext,
4758    CreateClusterPlan {
4759        name,
4760        variant,
4761        workload_class,
4762    }: CreateClusterPlan,
4763) -> Result<CreateClusterStatement<Aug>, PlanError> {
4764    match variant {
4765        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4766            replication_factor,
4767            size,
4768            availability_zones,
4769            compute,
4770            optimizer_feature_overrides,
4771            schedule,
4772        }) => {
4773            let schedule = unplan_cluster_schedule(schedule);
4774            let OptimizerFeatureOverrides {
4775                enable_guard_subquery_tablefunc: _,
4776                enable_consolidate_after_union_negate: _,
4777                enable_reduce_mfp_fusion: _,
4778                enable_cardinality_estimates: _,
4779                persist_fast_path_limit: _,
4780                reoptimize_imported_views,
4781                enable_eager_delta_joins,
4782                enable_new_outer_join_lowering,
4783                enable_variadic_left_join_lowering,
4784                enable_letrec_fixpoint_analysis,
4785                enable_reduce_reduction: _,
4786                enable_join_prioritize_arranged,
4787                enable_projection_pushdown_after_relation_cse,
4788                enable_less_reduce_in_eqprop: _,
4789                enable_dequadratic_eqprop_map: _,
4790                enable_eq_classes_withholding_errors: _,
4791                enable_fast_path_plan_insights: _,
4792            } = optimizer_feature_overrides;
4793            // The ones from above that don't occur below are not wired up to cluster features.
4794            let features_extracted = ClusterFeatureExtracted {
4795                // Seen is ignored when unplanning.
4796                seen: Default::default(),
4797                reoptimize_imported_views,
4798                enable_eager_delta_joins,
4799                enable_new_outer_join_lowering,
4800                enable_variadic_left_join_lowering,
4801                enable_letrec_fixpoint_analysis,
4802                enable_join_prioritize_arranged,
4803                enable_projection_pushdown_after_relation_cse,
4804            };
4805            let features = features_extracted.into_values(scx.catalog);
4806            let availability_zones = if availability_zones.is_empty() {
4807                None
4808            } else {
4809                Some(availability_zones)
4810            };
4811            let (introspection_interval, introspection_debugging) =
4812                unplan_compute_replica_config(compute);
4813            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4814            // always 1 or less.
4815            let replication_factor = match &schedule {
4816                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4817                ClusterScheduleOptionValue::Refresh { .. } => {
4818                    assert!(
4819                        replication_factor <= 1,
4820                        "replication factor, {replication_factor:?}, must be <= 1"
4821                    );
4822                    None
4823                }
4824            };
4825            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4826            let options_extracted = ClusterOptionExtracted {
4827                // Seen is ignored when unplanning.
4828                seen: Default::default(),
4829                availability_zones,
4830                disk: None,
4831                introspection_debugging: Some(introspection_debugging),
4832                introspection_interval,
4833                managed: Some(true),
4834                replicas: None,
4835                replication_factor,
4836                size: Some(size),
4837                schedule: Some(schedule),
4838                workload_class,
4839            };
4840            let options = options_extracted.into_values(scx.catalog);
4841            let name = Ident::new_unchecked(name);
4842            Ok(CreateClusterStatement {
4843                name,
4844                options,
4845                features,
4846            })
4847        }
4848        CreateClusterVariant::Unmanaged(_) => {
4849            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4850        }
4851    }
4852}
4853
4854generate_extracted_config!(
4855    ReplicaOption,
4856    (AvailabilityZone, String),
4857    (BilledAs, String),
4858    (ComputeAddresses, Vec<String>),
4859    (ComputectlAddresses, Vec<String>),
4860    (Disk, bool),
4861    (Internal, bool, Default(false)),
4862    (IntrospectionDebugging, bool, Default(false)),
4863    (IntrospectionInterval, OptionalDuration),
4864    (Size, String),
4865    (StorageAddresses, Vec<String>),
4866    (StoragectlAddresses, Vec<String>),
4867    (Workers, u16)
4868);
4869
4870fn plan_replica_config(
4871    scx: &StatementContext,
4872    options: Vec<ReplicaOption<Aug>>,
4873) -> Result<ReplicaConfig, PlanError> {
4874    let ReplicaOptionExtracted {
4875        availability_zone,
4876        billed_as,
4877        computectl_addresses,
4878        disk,
4879        internal,
4880        introspection_debugging,
4881        introspection_interval,
4882        size,
4883        storagectl_addresses,
4884        ..
4885    }: ReplicaOptionExtracted = options.try_into()?;
4886
4887    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4888
4889    match (
4890        size,
4891        availability_zone,
4892        billed_as,
4893        storagectl_addresses,
4894        computectl_addresses,
4895    ) {
4896        // Common cases we expect end users to hit.
4897        (None, _, None, None, None) => {
4898            // We don't mention the unmanaged options in the error message
4899            // because they are only available in unsafe mode.
4900            sql_bail!("SIZE option must be specified");
4901        }
4902        (Some(size), availability_zone, billed_as, None, None) => {
4903            if disk.is_some() {
4904                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4905                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4906                // we'll be able to remove the `DISK` option entirely.
4907                if scx.catalog.is_cluster_size_cc(&size) {
4908                    sql_bail!(
4909                        "DISK option not supported for modern cluster sizes because disk is always enabled"
4910                    );
4911                }
4912
4913                scx.catalog
4914                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4915            }
4916
4917            Ok(ReplicaConfig::Orchestrated {
4918                size,
4919                availability_zone,
4920                compute,
4921                billed_as,
4922                internal,
4923            })
4924        }
4925
4926        (None, None, None, storagectl_addresses, computectl_addresses) => {
4927            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4928
4929            // When manually testing Materialize in unsafe mode, it's easy to
4930            // accidentally omit one of these options, so we try to produce
4931            // helpful error messages.
4932            let Some(storagectl_addrs) = storagectl_addresses else {
4933                sql_bail!("missing STORAGECTL ADDRESSES option");
4934            };
4935            let Some(computectl_addrs) = computectl_addresses else {
4936                sql_bail!("missing COMPUTECTL ADDRESSES option");
4937            };
4938
4939            if storagectl_addrs.len() != computectl_addrs.len() {
4940                sql_bail!(
4941                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4942                );
4943            }
4944
4945            if disk.is_some() {
4946                sql_bail!("DISK can't be specified for unorchestrated clusters");
4947            }
4948
4949            Ok(ReplicaConfig::Unorchestrated {
4950                storagectl_addrs,
4951                computectl_addrs,
4952                compute,
4953            })
4954        }
4955        _ => {
4956            // We don't bother trying to produce a more helpful error message
4957            // here because no user is likely to hit this path.
4958            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4959        }
4960    }
4961}
4962
4963/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
4964///
4965/// The reverse of [`unplan_compute_replica_config`].
4966fn plan_compute_replica_config(
4967    introspection_interval: Option<OptionalDuration>,
4968    introspection_debugging: bool,
4969) -> Result<ComputeReplicaConfig, PlanError> {
4970    let introspection_interval = introspection_interval
4971        .map(|OptionalDuration(i)| i)
4972        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
4973    let introspection = match introspection_interval {
4974        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
4975            interval,
4976            debugging: introspection_debugging,
4977        }),
4978        None if introspection_debugging => {
4979            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
4980        }
4981        None => None,
4982    };
4983    let compute = ComputeReplicaConfig { introspection };
4984    Ok(compute)
4985}
4986
4987/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
4988///
4989/// The reverse of [`plan_compute_replica_config`].
4990fn unplan_compute_replica_config(
4991    compute_replica_config: ComputeReplicaConfig,
4992) -> (Option<OptionalDuration>, bool) {
4993    match compute_replica_config.introspection {
4994        Some(ComputeReplicaIntrospectionConfig {
4995            debugging,
4996            interval,
4997        }) => (Some(OptionalDuration(Some(interval))), debugging),
4998        None => (Some(OptionalDuration(None)), false),
4999    }
5000}
5001
5002/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5003///
5004/// The reverse of [`unplan_cluster_schedule`].
5005fn plan_cluster_schedule(
5006    schedule: ClusterScheduleOptionValue,
5007) -> Result<ClusterSchedule, PlanError> {
5008    Ok(match schedule {
5009        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5010        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5011        ClusterScheduleOptionValue::Refresh {
5012            hydration_time_estimate: None,
5013        } => ClusterSchedule::Refresh {
5014            hydration_time_estimate: Duration::from_millis(0),
5015        },
5016        // Otherwise we convert the `IntervalValue` to a `Duration`.
5017        ClusterScheduleOptionValue::Refresh {
5018            hydration_time_estimate: Some(interval_value),
5019        } => {
5020            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5021            if interval.as_microseconds() < 0 {
5022                sql_bail!(
5023                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5024                    interval
5025                );
5026            }
5027            if interval.months != 0 {
5028                // This limitation is because we want this interval to be cleanly convertable
5029                // to a unix epoch timestamp difference. When the interval involves months, then
5030                // this is not true anymore, because months have variable lengths.
5031                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5032            }
5033            let duration = interval.duration()?;
5034            if u64::try_from(duration.as_millis()).is_err()
5035                || Interval::from_duration(&duration).is_err()
5036            {
5037                sql_bail!("HYDRATION TIME ESTIMATE too large");
5038            }
5039            ClusterSchedule::Refresh {
5040                hydration_time_estimate: duration,
5041            }
5042        }
5043    })
5044}
5045
5046/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5047///
5048/// The reverse of [`plan_cluster_schedule`].
5049fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5050    match schedule {
5051        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5052        ClusterSchedule::Refresh {
5053            hydration_time_estimate,
5054        } => {
5055            let interval = Interval::from_duration(&hydration_time_estimate)
5056                .expect("planning ensured that this is convertible back to Interval");
5057            let interval_value = literal::unplan_interval(&interval);
5058            ClusterScheduleOptionValue::Refresh {
5059                hydration_time_estimate: Some(interval_value),
5060            }
5061        }
5062    }
5063}
5064
5065pub fn describe_create_cluster_replica(
5066    _: &StatementContext,
5067    _: CreateClusterReplicaStatement<Aug>,
5068) -> Result<StatementDesc, PlanError> {
5069    Ok(StatementDesc::new(None))
5070}
5071
5072pub fn plan_create_cluster_replica(
5073    scx: &StatementContext,
5074    CreateClusterReplicaStatement {
5075        definition: ReplicaDefinition { name, options },
5076        of_cluster,
5077    }: CreateClusterReplicaStatement<Aug>,
5078) -> Result<Plan, PlanError> {
5079    let cluster = scx
5080        .catalog
5081        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5082    let current_replica_count = cluster.replica_ids().iter().count();
5083    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5084        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5085        return Err(PlanError::CreateReplicaFailStorageObjects {
5086            current_replica_count,
5087            internal_replica_count,
5088            hypothetical_replica_count: current_replica_count + 1,
5089        });
5090    }
5091
5092    let config = plan_replica_config(scx, options)?;
5093
5094    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5095        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5096            return Err(PlanError::MangedReplicaName(name.into_string()));
5097        }
5098    } else {
5099        ensure_cluster_is_not_managed(scx, cluster.id())?;
5100    }
5101
5102    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5103        name: normalize::ident(name),
5104        cluster_id: cluster.id(),
5105        config,
5106    }))
5107}
5108
5109pub fn describe_create_secret(
5110    _: &StatementContext,
5111    _: CreateSecretStatement<Aug>,
5112) -> Result<StatementDesc, PlanError> {
5113    Ok(StatementDesc::new(None))
5114}
5115
5116pub fn plan_create_secret(
5117    scx: &StatementContext,
5118    stmt: CreateSecretStatement<Aug>,
5119) -> Result<Plan, PlanError> {
5120    let CreateSecretStatement {
5121        name,
5122        if_not_exists,
5123        value,
5124    } = &stmt;
5125
5126    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5127    let mut create_sql_statement = stmt.clone();
5128    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5129    let create_sql =
5130        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5131    let secret_as = query::plan_secret_as(scx, value.clone())?;
5132
5133    let secret = Secret {
5134        create_sql,
5135        secret_as,
5136    };
5137
5138    Ok(Plan::CreateSecret(CreateSecretPlan {
5139        name,
5140        secret,
5141        if_not_exists: *if_not_exists,
5142    }))
5143}
5144
5145pub fn describe_create_connection(
5146    _: &StatementContext,
5147    _: CreateConnectionStatement<Aug>,
5148) -> Result<StatementDesc, PlanError> {
5149    Ok(StatementDesc::new(None))
5150}
5151
5152generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5153
5154pub fn plan_create_connection(
5155    scx: &StatementContext,
5156    mut stmt: CreateConnectionStatement<Aug>,
5157) -> Result<Plan, PlanError> {
5158    let CreateConnectionStatement {
5159        name,
5160        connection_type,
5161        values,
5162        if_not_exists,
5163        with_options,
5164    } = stmt.clone();
5165    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5166    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5167    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5168
5169    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5170    if options.validate.is_some() {
5171        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5172    }
5173    let validate = match options.validate {
5174        Some(val) => val,
5175        None => {
5176            scx.catalog
5177                .system_vars()
5178                .enable_default_connection_validation()
5179                && details.to_connection().validate_by_default()
5180        }
5181    };
5182
5183    // Check for an object in the catalog with this same name
5184    let full_name = scx.catalog.resolve_full_name(&name);
5185    let partial_name = PartialItemName::from(full_name.clone());
5186    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5187        return Err(PlanError::ItemAlreadyExists {
5188            name: full_name.to_string(),
5189            item_type: item.item_type(),
5190        });
5191    }
5192
5193    // For SSH connections, overwrite the public key options based on the
5194    // connection details, in case we generated new keys during planning.
5195    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5196        stmt.values.retain(|v| {
5197            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5198        });
5199        stmt.values.push(ConnectionOption {
5200            name: ConnectionOptionName::PublicKey1,
5201            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5202        });
5203        stmt.values.push(ConnectionOption {
5204            name: ConnectionOptionName::PublicKey2,
5205            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5206        });
5207    }
5208    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5209
5210    let plan = CreateConnectionPlan {
5211        name,
5212        if_not_exists,
5213        connection: crate::plan::Connection {
5214            create_sql,
5215            details,
5216        },
5217        validate,
5218    };
5219    Ok(Plan::CreateConnection(plan))
5220}
5221
5222fn plan_drop_database(
5223    scx: &StatementContext,
5224    if_exists: bool,
5225    name: &UnresolvedDatabaseName,
5226    cascade: bool,
5227) -> Result<Option<DatabaseId>, PlanError> {
5228    Ok(match resolve_database(scx, name, if_exists)? {
5229        Some(database) => {
5230            if !cascade && database.has_schemas() {
5231                sql_bail!(
5232                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5233                    name,
5234                );
5235            }
5236            Some(database.id())
5237        }
5238        None => None,
5239    })
5240}
5241
5242pub fn describe_drop_objects(
5243    _: &StatementContext,
5244    _: DropObjectsStatement,
5245) -> Result<StatementDesc, PlanError> {
5246    Ok(StatementDesc::new(None))
5247}
5248
5249pub fn plan_drop_objects(
5250    scx: &mut StatementContext,
5251    DropObjectsStatement {
5252        object_type,
5253        if_exists,
5254        names,
5255        cascade,
5256    }: DropObjectsStatement,
5257) -> Result<Plan, PlanError> {
5258    assert_ne!(
5259        object_type,
5260        mz_sql_parser::ast::ObjectType::Func,
5261        "rejected in parser"
5262    );
5263    let object_type = object_type.into();
5264
5265    let mut referenced_ids = Vec::new();
5266    for name in names {
5267        let id = match &name {
5268            UnresolvedObjectName::Cluster(name) => {
5269                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5270            }
5271            UnresolvedObjectName::ClusterReplica(name) => {
5272                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5273            }
5274            UnresolvedObjectName::Database(name) => {
5275                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5276            }
5277            UnresolvedObjectName::Schema(name) => {
5278                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5279            }
5280            UnresolvedObjectName::Role(name) => {
5281                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5282            }
5283            UnresolvedObjectName::Item(name) => {
5284                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5285                    .map(ObjectId::Item)
5286            }
5287            UnresolvedObjectName::NetworkPolicy(name) => {
5288                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5289            }
5290        };
5291        match id {
5292            Some(id) => referenced_ids.push(id),
5293            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5294                name: name.to_ast_string_simple(),
5295                object_type,
5296            }),
5297        }
5298    }
5299    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5300
5301    Ok(Plan::DropObjects(DropObjectsPlan {
5302        referenced_ids,
5303        drop_ids,
5304        object_type,
5305    }))
5306}
5307
5308fn plan_drop_schema(
5309    scx: &StatementContext,
5310    if_exists: bool,
5311    name: &UnresolvedSchemaName,
5312    cascade: bool,
5313) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5314    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5315        Some((database_spec, schema_spec)) => {
5316            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5317                sql_bail!(
5318                    "cannot drop schema {name} because it is required by the database system",
5319                );
5320            }
5321            if let SchemaSpecifier::Temporary = schema_spec {
5322                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5323            }
5324            let schema = scx.get_schema(&database_spec, &schema_spec);
5325            if !cascade && schema.has_items() {
5326                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5327                sql_bail!(
5328                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5329                    full_schema_name
5330                );
5331            }
5332            Some((database_spec, schema_spec))
5333        }
5334        None => None,
5335    })
5336}
5337
5338fn plan_drop_role(
5339    scx: &StatementContext,
5340    if_exists: bool,
5341    name: &Ident,
5342) -> Result<Option<RoleId>, PlanError> {
5343    match scx.catalog.resolve_role(name.as_str()) {
5344        Ok(role) => {
5345            let id = role.id();
5346            if &id == scx.catalog.active_role_id() {
5347                sql_bail!("current role cannot be dropped");
5348            }
5349            for role in scx.catalog.get_roles() {
5350                for (member_id, grantor_id) in role.membership() {
5351                    if &id == grantor_id {
5352                        let member_role = scx.catalog.get_role(member_id);
5353                        sql_bail!(
5354                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5355                            name.as_str(),
5356                            role.name(),
5357                            member_role.name()
5358                        );
5359                    }
5360                }
5361            }
5362            Ok(Some(role.id()))
5363        }
5364        Err(_) if if_exists => Ok(None),
5365        Err(e) => Err(e.into()),
5366    }
5367}
5368
5369fn plan_drop_cluster(
5370    scx: &StatementContext,
5371    if_exists: bool,
5372    name: &Ident,
5373    cascade: bool,
5374) -> Result<Option<ClusterId>, PlanError> {
5375    Ok(match resolve_cluster(scx, name, if_exists)? {
5376        Some(cluster) => {
5377            if !cascade && !cluster.bound_objects().is_empty() {
5378                return Err(PlanError::DependentObjectsStillExist {
5379                    object_type: "cluster".to_string(),
5380                    object_name: cluster.name().to_string(),
5381                    dependents: Vec::new(),
5382                });
5383            }
5384            Some(cluster.id())
5385        }
5386        None => None,
5387    })
5388}
5389
5390fn plan_drop_network_policy(
5391    scx: &StatementContext,
5392    if_exists: bool,
5393    name: &Ident,
5394) -> Result<Option<NetworkPolicyId>, PlanError> {
5395    match scx.catalog.resolve_network_policy(name.as_str()) {
5396        Ok(policy) => {
5397            // TODO(network_policy): When we support role based network policies, check if any role
5398            // currently has the specified policy set.
5399            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5400                Err(PlanError::NetworkPolicyInUse)
5401            } else {
5402                Ok(Some(policy.id()))
5403            }
5404        }
5405        Err(_) if if_exists => Ok(None),
5406        Err(e) => Err(e.into()),
5407    }
5408}
5409
5410/// Returns `true` if the cluster has any object that requires a single replica.
5411/// Returns `false` if the cluster has no objects.
5412fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5413    // If this feature is enabled then all objects support multiple-replicas
5414    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5415        false
5416    } else {
5417        // Othewise we check for the existence of sources or sinks
5418        cluster.bound_objects().iter().any(|id| {
5419            let item = scx.catalog.get_item(id);
5420            matches!(
5421                item.item_type(),
5422                CatalogItemType::Sink | CatalogItemType::Source
5423            )
5424        })
5425    }
5426}
5427
5428fn plan_drop_cluster_replica(
5429    scx: &StatementContext,
5430    if_exists: bool,
5431    name: &QualifiedReplica,
5432) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5433    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5434    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5435}
5436
5437/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5438fn plan_drop_item(
5439    scx: &StatementContext,
5440    object_type: ObjectType,
5441    if_exists: bool,
5442    name: UnresolvedItemName,
5443    cascade: bool,
5444) -> Result<Option<CatalogItemId>, PlanError> {
5445    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5446        Ok(r) => r,
5447        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5448        Err(PlanError::MismatchedObjectType {
5449            name,
5450            is_type: ObjectType::MaterializedView,
5451            expected_type: ObjectType::View,
5452        }) => {
5453            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5454        }
5455        e => e?,
5456    };
5457
5458    Ok(match resolved {
5459        Some(catalog_item) => {
5460            if catalog_item.id().is_system() {
5461                sql_bail!(
5462                    "cannot drop {} {} because it is required by the database system",
5463                    catalog_item.item_type(),
5464                    scx.catalog.minimal_qualification(catalog_item.name()),
5465                );
5466            }
5467
5468            if !cascade {
5469                for id in catalog_item.used_by() {
5470                    let dep = scx.catalog.get_item(id);
5471                    if dependency_prevents_drop(object_type, dep) {
5472                        return Err(PlanError::DependentObjectsStillExist {
5473                            object_type: catalog_item.item_type().to_string(),
5474                            object_name: scx
5475                                .catalog
5476                                .minimal_qualification(catalog_item.name())
5477                                .to_string(),
5478                            dependents: vec![(
5479                                dep.item_type().to_string(),
5480                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5481                            )],
5482                        });
5483                    }
5484                }
5485                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5486                //  relies on entry. Unfortunately, we don't have that information readily available.
5487            }
5488            Some(catalog_item.id())
5489        }
5490        None => None,
5491    })
5492}
5493
5494/// Does the dependency `dep` prevent a drop of a non-cascade query?
5495fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5496    match object_type {
5497        ObjectType::Type => true,
5498        _ => match dep.item_type() {
5499            CatalogItemType::Func
5500            | CatalogItemType::Table
5501            | CatalogItemType::Source
5502            | CatalogItemType::View
5503            | CatalogItemType::MaterializedView
5504            | CatalogItemType::Sink
5505            | CatalogItemType::Type
5506            | CatalogItemType::Secret
5507            | CatalogItemType::Connection
5508            | CatalogItemType::ContinualTask => true,
5509            CatalogItemType::Index => false,
5510        },
5511    }
5512}
5513
5514pub fn describe_alter_index_options(
5515    _: &StatementContext,
5516    _: AlterIndexStatement<Aug>,
5517) -> Result<StatementDesc, PlanError> {
5518    Ok(StatementDesc::new(None))
5519}
5520
5521pub fn describe_drop_owned(
5522    _: &StatementContext,
5523    _: DropOwnedStatement<Aug>,
5524) -> Result<StatementDesc, PlanError> {
5525    Ok(StatementDesc::new(None))
5526}
5527
5528pub fn plan_drop_owned(
5529    scx: &StatementContext,
5530    drop: DropOwnedStatement<Aug>,
5531) -> Result<Plan, PlanError> {
5532    let cascade = drop.cascade();
5533    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5534    let mut drop_ids = Vec::new();
5535    let mut privilege_revokes = Vec::new();
5536    let mut default_privilege_revokes = Vec::new();
5537
5538    fn update_privilege_revokes(
5539        object_id: SystemObjectId,
5540        privileges: &PrivilegeMap,
5541        role_ids: &BTreeSet<RoleId>,
5542        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5543    ) {
5544        privilege_revokes.extend(iter::zip(
5545            iter::repeat(object_id),
5546            privileges
5547                .all_values()
5548                .filter(|privilege| role_ids.contains(&privilege.grantee))
5549                .cloned(),
5550        ));
5551    }
5552
5553    // Replicas
5554    for replica in scx.catalog.get_cluster_replicas() {
5555        if role_ids.contains(&replica.owner_id()) {
5556            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5557        }
5558    }
5559
5560    // Clusters
5561    for cluster in scx.catalog.get_clusters() {
5562        if role_ids.contains(&cluster.owner_id()) {
5563            // Note: CASCADE is not required for replicas.
5564            if !cascade {
5565                let non_owned_bound_objects: Vec<_> = cluster
5566                    .bound_objects()
5567                    .into_iter()
5568                    .map(|item_id| scx.catalog.get_item(item_id))
5569                    .filter(|item| !role_ids.contains(&item.owner_id()))
5570                    .collect();
5571                if !non_owned_bound_objects.is_empty() {
5572                    let names: Vec<_> = non_owned_bound_objects
5573                        .into_iter()
5574                        .map(|item| {
5575                            (
5576                                item.item_type().to_string(),
5577                                scx.catalog.resolve_full_name(item.name()).to_string(),
5578                            )
5579                        })
5580                        .collect();
5581                    return Err(PlanError::DependentObjectsStillExist {
5582                        object_type: "cluster".to_string(),
5583                        object_name: cluster.name().to_string(),
5584                        dependents: names,
5585                    });
5586                }
5587            }
5588            drop_ids.push(cluster.id().into());
5589        }
5590        update_privilege_revokes(
5591            SystemObjectId::Object(cluster.id().into()),
5592            cluster.privileges(),
5593            &role_ids,
5594            &mut privilege_revokes,
5595        );
5596    }
5597
5598    // Items
5599    for item in scx.catalog.get_items() {
5600        if role_ids.contains(&item.owner_id()) {
5601            if !cascade {
5602                // Checks if any items still depend on this one, returning an error if so.
5603                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5604                    let non_owned_dependencies: Vec<_> = used_by
5605                        .into_iter()
5606                        .map(|item_id| scx.catalog.get_item(item_id))
5607                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5608                        .filter(|item| !role_ids.contains(&item.owner_id()))
5609                        .collect();
5610                    if !non_owned_dependencies.is_empty() {
5611                        let names: Vec<_> = non_owned_dependencies
5612                            .into_iter()
5613                            .map(|item| {
5614                                let item_typ = item.item_type().to_string();
5615                                let item_name =
5616                                    scx.catalog.resolve_full_name(item.name()).to_string();
5617                                (item_typ, item_name)
5618                            })
5619                            .collect();
5620                        Err(PlanError::DependentObjectsStillExist {
5621                            object_type: item.item_type().to_string(),
5622                            object_name: scx
5623                                .catalog
5624                                .resolve_full_name(item.name())
5625                                .to_string()
5626                                .to_string(),
5627                            dependents: names,
5628                        })
5629                    } else {
5630                        Ok(())
5631                    }
5632                };
5633
5634                // When this item gets dropped it will also drop its progress source, so we need to
5635                // check the users of those.
5636                if let Some(id) = item.progress_id() {
5637                    let progress_item = scx.catalog.get_item(&id);
5638                    check_if_dependents_exist(progress_item.used_by())?;
5639                }
5640                check_if_dependents_exist(item.used_by())?;
5641            }
5642            drop_ids.push(item.id().into());
5643        }
5644        update_privilege_revokes(
5645            SystemObjectId::Object(item.id().into()),
5646            item.privileges(),
5647            &role_ids,
5648            &mut privilege_revokes,
5649        );
5650    }
5651
5652    // Schemas
5653    for schema in scx.catalog.get_schemas() {
5654        if !schema.id().is_temporary() {
5655            if role_ids.contains(&schema.owner_id()) {
5656                if !cascade {
5657                    let non_owned_dependencies: Vec<_> = schema
5658                        .item_ids()
5659                        .map(|item_id| scx.catalog.get_item(&item_id))
5660                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5661                        .filter(|item| !role_ids.contains(&item.owner_id()))
5662                        .collect();
5663                    if !non_owned_dependencies.is_empty() {
5664                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5665                        sql_bail!(
5666                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5667                            full_schema_name.to_string().quoted()
5668                        );
5669                    }
5670                }
5671                drop_ids.push((*schema.database(), *schema.id()).into())
5672            }
5673            update_privilege_revokes(
5674                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5675                schema.privileges(),
5676                &role_ids,
5677                &mut privilege_revokes,
5678            );
5679        }
5680    }
5681
5682    // Databases
5683    for database in scx.catalog.get_databases() {
5684        if role_ids.contains(&database.owner_id()) {
5685            if !cascade {
5686                let non_owned_schemas: Vec<_> = database
5687                    .schemas()
5688                    .into_iter()
5689                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5690                    .collect();
5691                if !non_owned_schemas.is_empty() {
5692                    sql_bail!(
5693                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5694                        database.name().quoted(),
5695                    );
5696                }
5697            }
5698            drop_ids.push(database.id().into());
5699        }
5700        update_privilege_revokes(
5701            SystemObjectId::Object(database.id().into()),
5702            database.privileges(),
5703            &role_ids,
5704            &mut privilege_revokes,
5705        );
5706    }
5707
5708    // System
5709    update_privilege_revokes(
5710        SystemObjectId::System,
5711        scx.catalog.get_system_privileges(),
5712        &role_ids,
5713        &mut privilege_revokes,
5714    );
5715
5716    for (default_privilege_object, default_privilege_acl_items) in
5717        scx.catalog.get_default_privileges()
5718    {
5719        for default_privilege_acl_item in default_privilege_acl_items {
5720            if role_ids.contains(&default_privilege_object.role_id)
5721                || role_ids.contains(&default_privilege_acl_item.grantee)
5722            {
5723                default_privilege_revokes.push((
5724                    default_privilege_object.clone(),
5725                    default_privilege_acl_item.clone(),
5726                ));
5727            }
5728        }
5729    }
5730
5731    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5732
5733    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5734    if !system_ids.is_empty() {
5735        let mut owners = system_ids
5736            .into_iter()
5737            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5738            .collect::<BTreeSet<_>>()
5739            .into_iter()
5740            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5741        sql_bail!(
5742            "cannot drop objects owned by role {} because they are required by the database system",
5743            owners.join(", "),
5744        );
5745    }
5746
5747    Ok(Plan::DropOwned(DropOwnedPlan {
5748        role_ids: role_ids.into_iter().collect(),
5749        drop_ids,
5750        privilege_revokes,
5751        default_privilege_revokes,
5752    }))
5753}
5754
5755fn plan_retain_history_option(
5756    scx: &StatementContext,
5757    retain_history: Option<OptionalDuration>,
5758) -> Result<Option<CompactionWindow>, PlanError> {
5759    if let Some(OptionalDuration(lcw)) = retain_history {
5760        Ok(Some(plan_retain_history(scx, lcw)?))
5761    } else {
5762        Ok(None)
5763    }
5764}
5765
5766// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5767// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5768// already converts the zero duration into `None`. This function must not be called in the `RESET
5769// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5770// `None`.
5771fn plan_retain_history(
5772    scx: &StatementContext,
5773    lcw: Option<Duration>,
5774) -> Result<CompactionWindow, PlanError> {
5775    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5776    match lcw {
5777        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5778        // disable compaction), and should never occur here. Furthermore, some things actually do
5779        // break when this is set to real zero:
5780        // https://github.com/MaterializeInc/database-issues/issues/3798.
5781        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5782            option_name: "RETAIN HISTORY".to_string(),
5783            err: Box::new(PlanError::Unstructured(
5784                "internal error: unexpectedly zero".to_string(),
5785            )),
5786        }),
5787        Some(duration) => {
5788            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5789            // should only be possible during testing).
5790            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5791                && scx
5792                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5793                    .is_err()
5794            {
5795                return Err(PlanError::RetainHistoryLow {
5796                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5797                });
5798            }
5799            Ok(duration.try_into()?)
5800        }
5801        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5802        // to be a bad choice, so prevent it.
5803        None => {
5804            if scx
5805                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5806                .is_err()
5807            {
5808                Err(PlanError::RetainHistoryRequired)
5809            } else {
5810                Ok(CompactionWindow::DisableCompaction)
5811            }
5812        }
5813    }
5814}
5815
5816generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5817
5818fn plan_index_options(
5819    scx: &StatementContext,
5820    with_opts: Vec<IndexOption<Aug>>,
5821) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5822    if !with_opts.is_empty() {
5823        // Index options are not durable.
5824        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5825    }
5826
5827    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5828
5829    let mut out = Vec::with_capacity(1);
5830    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5831        out.push(crate::plan::IndexOption::RetainHistory(cw));
5832    }
5833    Ok(out)
5834}
5835
5836generate_extracted_config!(
5837    TableOption,
5838    (PartitionBy, Vec<Ident>),
5839    (RetainHistory, OptionalDuration),
5840    (RedactedTest, String)
5841);
5842
5843fn plan_table_options(
5844    scx: &StatementContext,
5845    desc: &RelationDesc,
5846    with_opts: Vec<TableOption<Aug>>,
5847) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5848    let TableOptionExtracted {
5849        partition_by,
5850        retain_history,
5851        redacted_test,
5852        ..
5853    }: TableOptionExtracted = with_opts.try_into()?;
5854
5855    if let Some(partition_by) = partition_by {
5856        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5857        check_partition_by(desc, partition_by)?;
5858    }
5859
5860    if redacted_test.is_some() {
5861        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5862    }
5863
5864    let mut out = Vec::with_capacity(1);
5865    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5866        out.push(crate::plan::TableOption::RetainHistory(cw));
5867    }
5868    Ok(out)
5869}
5870
5871pub fn plan_alter_index_options(
5872    scx: &mut StatementContext,
5873    AlterIndexStatement {
5874        index_name,
5875        if_exists,
5876        action,
5877    }: AlterIndexStatement<Aug>,
5878) -> Result<Plan, PlanError> {
5879    let object_type = ObjectType::Index;
5880    match action {
5881        AlterIndexAction::ResetOptions(options) => {
5882            let mut options = options.into_iter();
5883            if let Some(opt) = options.next() {
5884                match opt {
5885                    IndexOptionName::RetainHistory => {
5886                        if options.next().is_some() {
5887                            sql_bail!("RETAIN HISTORY must be only option");
5888                        }
5889                        return alter_retain_history(
5890                            scx,
5891                            object_type,
5892                            if_exists,
5893                            UnresolvedObjectName::Item(index_name),
5894                            None,
5895                        );
5896                    }
5897                }
5898            }
5899            sql_bail!("expected option");
5900        }
5901        AlterIndexAction::SetOptions(options) => {
5902            let mut options = options.into_iter();
5903            if let Some(opt) = options.next() {
5904                match opt.name {
5905                    IndexOptionName::RetainHistory => {
5906                        if options.next().is_some() {
5907                            sql_bail!("RETAIN HISTORY must be only option");
5908                        }
5909                        return alter_retain_history(
5910                            scx,
5911                            object_type,
5912                            if_exists,
5913                            UnresolvedObjectName::Item(index_name),
5914                            opt.value,
5915                        );
5916                    }
5917                }
5918            }
5919            sql_bail!("expected option");
5920        }
5921    }
5922}
5923
5924pub fn describe_alter_cluster_set_options(
5925    _: &StatementContext,
5926    _: AlterClusterStatement<Aug>,
5927) -> Result<StatementDesc, PlanError> {
5928    Ok(StatementDesc::new(None))
5929}
5930
5931pub fn plan_alter_cluster(
5932    scx: &mut StatementContext,
5933    AlterClusterStatement {
5934        name,
5935        action,
5936        if_exists,
5937    }: AlterClusterStatement<Aug>,
5938) -> Result<Plan, PlanError> {
5939    let cluster = match resolve_cluster(scx, &name, if_exists)? {
5940        Some(entry) => entry,
5941        None => {
5942            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5943                name: name.to_ast_string_simple(),
5944                object_type: ObjectType::Cluster,
5945            });
5946
5947            return Ok(Plan::AlterNoop(AlterNoopPlan {
5948                object_type: ObjectType::Cluster,
5949            }));
5950        }
5951    };
5952
5953    let mut options: PlanClusterOption = Default::default();
5954    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5955
5956    match action {
5957        AlterClusterAction::SetOptions {
5958            options: set_options,
5959            with_options,
5960        } => {
5961            let ClusterOptionExtracted {
5962                availability_zones,
5963                introspection_debugging,
5964                introspection_interval,
5965                managed,
5966                replicas: replica_defs,
5967                replication_factor,
5968                seen: _,
5969                size,
5970                disk,
5971                schedule,
5972                workload_class,
5973            }: ClusterOptionExtracted = set_options.try_into()?;
5974
5975            if !scx.catalog.active_role_id().is_system() {
5976                if workload_class.is_some() {
5977                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
5978                }
5979            }
5980
5981            match managed.unwrap_or_else(|| cluster.is_managed()) {
5982                true => {
5983                    let alter_strategy_extracted =
5984                        ClusterAlterOptionExtracted::try_from(with_options)?;
5985                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
5986
5987                    match alter_strategy {
5988                        AlterClusterPlanStrategy::None => {}
5989                        _ => {
5990                            scx.require_feature_flag(
5991                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
5992                            )?;
5993                        }
5994                    }
5995
5996                    if replica_defs.is_some() {
5997                        sql_bail!("REPLICAS not supported for managed clusters");
5998                    }
5999                    if schedule.is_some()
6000                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6001                    {
6002                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6003                    }
6004
6005                    if let Some(replication_factor) = replication_factor {
6006                        if schedule.is_some()
6007                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6008                        {
6009                            sql_bail!(
6010                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6011                            );
6012                        }
6013                        if let Some(current_schedule) = cluster.schedule() {
6014                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6015                                sql_bail!(
6016                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6017                                );
6018                            }
6019                        }
6020
6021                        let internal_replica_count =
6022                            cluster.replicas().iter().filter(|r| r.internal()).count();
6023                        let hypothetical_replica_count =
6024                            internal_replica_count + usize::cast_from(replication_factor);
6025
6026                        // Total number of replicas running is internal replicas
6027                        // + replication factor.
6028                        if contains_single_replica_objects(scx, cluster)
6029                            && hypothetical_replica_count > 1
6030                        {
6031                            return Err(PlanError::CreateReplicaFailStorageObjects {
6032                                current_replica_count: cluster.replica_ids().iter().count(),
6033                                internal_replica_count,
6034                                hypothetical_replica_count,
6035                            });
6036                        }
6037                    } else if alter_strategy.is_some() {
6038                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6039                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6040                        // just fail.
6041                        let internal_replica_count =
6042                            cluster.replicas().iter().filter(|r| r.internal()).count();
6043                        let hypothetical_replica_count = internal_replica_count * 2;
6044                        if contains_single_replica_objects(scx, cluster) {
6045                            return Err(PlanError::CreateReplicaFailStorageObjects {
6046                                current_replica_count: cluster.replica_ids().iter().count(),
6047                                internal_replica_count,
6048                                hypothetical_replica_count,
6049                            });
6050                        }
6051                    }
6052                }
6053                false => {
6054                    if !alter_strategy.is_none() {
6055                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6056                    }
6057                    if availability_zones.is_some() {
6058                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6059                    }
6060                    if replication_factor.is_some() {
6061                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6062                    }
6063                    if introspection_debugging.is_some() {
6064                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6065                    }
6066                    if introspection_interval.is_some() {
6067                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6068                    }
6069                    if size.is_some() {
6070                        sql_bail!("SIZE not supported for unmanaged clusters");
6071                    }
6072                    if disk.is_some() {
6073                        sql_bail!("DISK not supported for unmanaged clusters");
6074                    }
6075                    if schedule.is_some()
6076                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6077                    {
6078                        sql_bail!(
6079                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6080                        );
6081                    }
6082                    if let Some(current_schedule) = cluster.schedule() {
6083                        if !matches!(current_schedule, ClusterSchedule::Manual)
6084                            && schedule.is_none()
6085                        {
6086                            sql_bail!(
6087                                "when switching a cluster to unmanaged, if the managed \
6088                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6089                                explicitly set the SCHEDULE to MANUAL"
6090                            );
6091                        }
6092                    }
6093                }
6094            }
6095
6096            let mut replicas = vec![];
6097            for ReplicaDefinition { name, options } in
6098                replica_defs.into_iter().flat_map(Vec::into_iter)
6099            {
6100                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6101            }
6102
6103            if let Some(managed) = managed {
6104                options.managed = AlterOptionParameter::Set(managed);
6105            }
6106            if let Some(replication_factor) = replication_factor {
6107                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6108            }
6109            if let Some(size) = &size {
6110                options.size = AlterOptionParameter::Set(size.clone());
6111            }
6112            if let Some(availability_zones) = availability_zones {
6113                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6114            }
6115            if let Some(introspection_debugging) = introspection_debugging {
6116                options.introspection_debugging =
6117                    AlterOptionParameter::Set(introspection_debugging);
6118            }
6119            if let Some(introspection_interval) = introspection_interval {
6120                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6121            }
6122            if disk.is_some() {
6123                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6124                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6125                // we'll be able to remove the `DISK` option entirely.
6126                let size = size.as_deref().unwrap_or_else(|| {
6127                    cluster.managed_size().expect("cluster known to be managed")
6128                });
6129                if scx.catalog.is_cluster_size_cc(size) {
6130                    sql_bail!(
6131                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6132                    );
6133                }
6134
6135                scx.catalog
6136                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6137            }
6138            if !replicas.is_empty() {
6139                options.replicas = AlterOptionParameter::Set(replicas);
6140            }
6141            if let Some(schedule) = schedule {
6142                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6143            }
6144            if let Some(workload_class) = workload_class {
6145                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6146            }
6147        }
6148        AlterClusterAction::ResetOptions(reset_options) => {
6149            use AlterOptionParameter::Reset;
6150            use ClusterOptionName::*;
6151
6152            if !scx.catalog.active_role_id().is_system() {
6153                if reset_options.contains(&WorkloadClass) {
6154                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6155                }
6156            }
6157
6158            for option in reset_options {
6159                match option {
6160                    AvailabilityZones => options.availability_zones = Reset,
6161                    Disk => scx
6162                        .catalog
6163                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6164                    IntrospectionInterval => options.introspection_interval = Reset,
6165                    IntrospectionDebugging => options.introspection_debugging = Reset,
6166                    Managed => options.managed = Reset,
6167                    Replicas => options.replicas = Reset,
6168                    ReplicationFactor => options.replication_factor = Reset,
6169                    Size => options.size = Reset,
6170                    Schedule => options.schedule = Reset,
6171                    WorkloadClass => options.workload_class = Reset,
6172                }
6173            }
6174        }
6175    }
6176    Ok(Plan::AlterCluster(AlterClusterPlan {
6177        id: cluster.id(),
6178        name: cluster.name().to_string(),
6179        options,
6180        strategy: alter_strategy,
6181    }))
6182}
6183
6184pub fn describe_alter_set_cluster(
6185    _: &StatementContext,
6186    _: AlterSetClusterStatement<Aug>,
6187) -> Result<StatementDesc, PlanError> {
6188    Ok(StatementDesc::new(None))
6189}
6190
6191pub fn plan_alter_item_set_cluster(
6192    scx: &StatementContext,
6193    AlterSetClusterStatement {
6194        if_exists,
6195        set_cluster: in_cluster_name,
6196        name,
6197        object_type,
6198    }: AlterSetClusterStatement<Aug>,
6199) -> Result<Plan, PlanError> {
6200    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6201
6202    let object_type = object_type.into();
6203
6204    // Prevent access to `SET CLUSTER` for unsupported objects.
6205    match object_type {
6206        ObjectType::MaterializedView => {}
6207        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6208            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6209        }
6210        _ => {
6211            bail_never_supported!(
6212                format!("ALTER {object_type} SET CLUSTER"),
6213                "sql/alter-set-cluster/",
6214                format!("{object_type} has no associated cluster")
6215            )
6216        }
6217    }
6218
6219    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6220
6221    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6222        Some(entry) => {
6223            let current_cluster = entry.cluster_id();
6224            let Some(current_cluster) = current_cluster else {
6225                sql_bail!("No cluster associated with {name}");
6226            };
6227
6228            if current_cluster == in_cluster.id() {
6229                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6230            } else {
6231                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6232                    id: entry.id(),
6233                    set_cluster: in_cluster.id(),
6234                }))
6235            }
6236        }
6237        None => {
6238            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6239                name: name.to_ast_string_simple(),
6240                object_type,
6241            });
6242
6243            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6244        }
6245    }
6246}
6247
6248pub fn describe_alter_object_rename(
6249    _: &StatementContext,
6250    _: AlterObjectRenameStatement,
6251) -> Result<StatementDesc, PlanError> {
6252    Ok(StatementDesc::new(None))
6253}
6254
6255pub fn plan_alter_object_rename(
6256    scx: &mut StatementContext,
6257    AlterObjectRenameStatement {
6258        name,
6259        object_type,
6260        to_item_name,
6261        if_exists,
6262    }: AlterObjectRenameStatement,
6263) -> Result<Plan, PlanError> {
6264    let object_type = object_type.into();
6265    match (object_type, name) {
6266        (
6267            ObjectType::View
6268            | ObjectType::MaterializedView
6269            | ObjectType::Table
6270            | ObjectType::Source
6271            | ObjectType::Index
6272            | ObjectType::Sink
6273            | ObjectType::Secret
6274            | ObjectType::Connection,
6275            UnresolvedObjectName::Item(name),
6276        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6277        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6278            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6279        }
6280        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6281            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6282        }
6283        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6284            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6285        }
6286        (object_type, name) => {
6287            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6288        }
6289    }
6290}
6291
6292pub fn plan_alter_schema_rename(
6293    scx: &mut StatementContext,
6294    name: UnresolvedSchemaName,
6295    to_schema_name: Ident,
6296    if_exists: bool,
6297) -> Result<Plan, PlanError> {
6298    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6299        let object_type = ObjectType::Schema;
6300        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6301            name: name.to_ast_string_simple(),
6302            object_type,
6303        });
6304        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6305    };
6306
6307    // Make sure the name is unique.
6308    if scx
6309        .resolve_schema_in_database(&db_spec, &to_schema_name)
6310        .is_ok()
6311    {
6312        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6313            to_schema_name.clone().into_string(),
6314        )));
6315    }
6316
6317    // Prevent users from renaming system related schemas.
6318    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6319    if schema.id().is_system() {
6320        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6321    }
6322
6323    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6324        cur_schema_spec: (db_spec, schema_spec),
6325        new_schema_name: to_schema_name.into_string(),
6326    }))
6327}
6328
6329pub fn plan_alter_schema_swap<F>(
6330    scx: &mut StatementContext,
6331    name_a: UnresolvedSchemaName,
6332    name_b: Ident,
6333    gen_temp_suffix: F,
6334) -> Result<Plan, PlanError>
6335where
6336    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6337{
6338    let schema_a = scx.resolve_schema(name_a.clone())?;
6339
6340    let db_spec = schema_a.database().clone();
6341    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6342        sql_bail!("cannot swap schemas that are in the ambient database");
6343    };
6344    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6345
6346    // We cannot swap system schemas.
6347    if schema_a.id().is_system() || schema_b.id().is_system() {
6348        bail_never_supported!("swapping a system schema".to_string())
6349    }
6350
6351    // Generate a temporary name we can swap schema_a to.
6352    //
6353    // 'check' returns if the temp schema name would be valid.
6354    let check = |temp_suffix: &str| {
6355        let mut temp_name = ident!("mz_schema_swap_");
6356        temp_name.append_lossy(temp_suffix);
6357        scx.resolve_schema_in_database(&db_spec, &temp_name)
6358            .is_err()
6359    };
6360    let temp_suffix = gen_temp_suffix(&check)?;
6361    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6362
6363    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6364        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6365        schema_a_name: schema_a.name().schema.to_string(),
6366        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6367        schema_b_name: schema_b.name().schema.to_string(),
6368        name_temp,
6369    }))
6370}
6371
6372pub fn plan_alter_item_rename(
6373    scx: &mut StatementContext,
6374    object_type: ObjectType,
6375    name: UnresolvedItemName,
6376    to_item_name: Ident,
6377    if_exists: bool,
6378) -> Result<Plan, PlanError> {
6379    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6380        Ok(r) => r,
6381        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6382        Err(PlanError::MismatchedObjectType {
6383            name,
6384            is_type: ObjectType::MaterializedView,
6385            expected_type: ObjectType::View,
6386        }) => {
6387            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6388        }
6389        e => e?,
6390    };
6391
6392    match resolved {
6393        Some(entry) => {
6394            let full_name = scx.catalog.resolve_full_name(entry.name());
6395            let item_type = entry.item_type();
6396
6397            let proposed_name = QualifiedItemName {
6398                qualifiers: entry.name().qualifiers.clone(),
6399                item: to_item_name.clone().into_string(),
6400            };
6401
6402            // For PostgreSQL compatibility, items and types cannot have
6403            // overlapping names in a variety of situations. See the comment on
6404            // `CatalogItemType::conflicts_with_type` for details.
6405            let conflicting_type_exists;
6406            let conflicting_item_exists;
6407            if item_type == CatalogItemType::Type {
6408                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6409                conflicting_item_exists = scx
6410                    .catalog
6411                    .get_item_by_name(&proposed_name)
6412                    .map(|item| item.item_type().conflicts_with_type())
6413                    .unwrap_or(false);
6414            } else {
6415                conflicting_type_exists = item_type.conflicts_with_type()
6416                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6417                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6418            };
6419            if conflicting_type_exists || conflicting_item_exists {
6420                sql_bail!("catalog item '{}' already exists", to_item_name);
6421            }
6422
6423            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6424                id: entry.id(),
6425                current_full_name: full_name,
6426                to_name: normalize::ident(to_item_name),
6427                object_type,
6428            }))
6429        }
6430        None => {
6431            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6432                name: name.to_ast_string_simple(),
6433                object_type,
6434            });
6435
6436            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6437        }
6438    }
6439}
6440
6441pub fn plan_alter_cluster_rename(
6442    scx: &mut StatementContext,
6443    object_type: ObjectType,
6444    name: Ident,
6445    to_name: Ident,
6446    if_exists: bool,
6447) -> Result<Plan, PlanError> {
6448    match resolve_cluster(scx, &name, if_exists)? {
6449        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6450            id: entry.id(),
6451            name: entry.name().to_string(),
6452            to_name: ident(to_name),
6453        })),
6454        None => {
6455            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6456                name: name.to_ast_string_simple(),
6457                object_type,
6458            });
6459
6460            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6461        }
6462    }
6463}
6464
6465pub fn plan_alter_cluster_swap<F>(
6466    scx: &mut StatementContext,
6467    name_a: Ident,
6468    name_b: Ident,
6469    gen_temp_suffix: F,
6470) -> Result<Plan, PlanError>
6471where
6472    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6473{
6474    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6475    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6476
6477    let check = |temp_suffix: &str| {
6478        let mut temp_name = ident!("mz_schema_swap_");
6479        temp_name.append_lossy(temp_suffix);
6480        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6481            // Temp name does not exist, so we can use it.
6482            Err(CatalogError::UnknownCluster(_)) => true,
6483            // Temp name already exists!
6484            Ok(_) | Err(_) => false,
6485        }
6486    };
6487    let temp_suffix = gen_temp_suffix(&check)?;
6488    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6489
6490    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6491        id_a: cluster_a.id(),
6492        id_b: cluster_b.id(),
6493        name_a: name_a.into_string(),
6494        name_b: name_b.into_string(),
6495        name_temp,
6496    }))
6497}
6498
6499pub fn plan_alter_cluster_replica_rename(
6500    scx: &mut StatementContext,
6501    object_type: ObjectType,
6502    name: QualifiedReplica,
6503    to_item_name: Ident,
6504    if_exists: bool,
6505) -> Result<Plan, PlanError> {
6506    match resolve_cluster_replica(scx, &name, if_exists)? {
6507        Some((cluster, replica)) => {
6508            ensure_cluster_is_not_managed(scx, cluster.id())?;
6509            Ok(Plan::AlterClusterReplicaRename(
6510                AlterClusterReplicaRenamePlan {
6511                    cluster_id: cluster.id(),
6512                    replica_id: replica,
6513                    name: QualifiedReplica {
6514                        cluster: Ident::new(cluster.name())?,
6515                        replica: name.replica,
6516                    },
6517                    to_name: normalize::ident(to_item_name),
6518                },
6519            ))
6520        }
6521        None => {
6522            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6523                name: name.to_ast_string_simple(),
6524                object_type,
6525            });
6526
6527            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6528        }
6529    }
6530}
6531
6532pub fn describe_alter_object_swap(
6533    _: &StatementContext,
6534    _: AlterObjectSwapStatement,
6535) -> Result<StatementDesc, PlanError> {
6536    Ok(StatementDesc::new(None))
6537}
6538
6539pub fn plan_alter_object_swap(
6540    scx: &mut StatementContext,
6541    stmt: AlterObjectSwapStatement,
6542) -> Result<Plan, PlanError> {
6543    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6544
6545    let AlterObjectSwapStatement {
6546        object_type,
6547        name_a,
6548        name_b,
6549    } = stmt;
6550    let object_type = object_type.into();
6551
6552    // We'll try 10 times to generate a temporary suffix.
6553    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6554        let mut attempts = 0;
6555        let name_temp = loop {
6556            attempts += 1;
6557            if attempts > 10 {
6558                tracing::warn!("Unable to generate temp id for swapping");
6559                sql_bail!("unable to swap!");
6560            }
6561
6562            // Call the provided closure to make sure this name is unique!
6563            let short_id = mz_ore::id_gen::temp_id();
6564            if check_fn(&short_id) {
6565                break short_id;
6566            }
6567        };
6568
6569        Ok(name_temp)
6570    };
6571
6572    match (object_type, name_a, name_b) {
6573        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6574            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6575        }
6576        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6577            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6578        }
6579        (object_type, _, _) => Err(PlanError::Unsupported {
6580            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6581            discussion_no: None,
6582        }),
6583    }
6584}
6585
6586pub fn describe_alter_retain_history(
6587    _: &StatementContext,
6588    _: AlterRetainHistoryStatement<Aug>,
6589) -> Result<StatementDesc, PlanError> {
6590    Ok(StatementDesc::new(None))
6591}
6592
6593pub fn plan_alter_retain_history(
6594    scx: &StatementContext,
6595    AlterRetainHistoryStatement {
6596        object_type,
6597        if_exists,
6598        name,
6599        history,
6600    }: AlterRetainHistoryStatement<Aug>,
6601) -> Result<Plan, PlanError> {
6602    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6603}
6604
6605fn alter_retain_history(
6606    scx: &StatementContext,
6607    object_type: ObjectType,
6608    if_exists: bool,
6609    name: UnresolvedObjectName,
6610    history: Option<WithOptionValue<Aug>>,
6611) -> Result<Plan, PlanError> {
6612    let name = match (object_type, name) {
6613        (
6614            // View gets a special error below.
6615            ObjectType::View
6616            | ObjectType::MaterializedView
6617            | ObjectType::Table
6618            | ObjectType::Source
6619            | ObjectType::Index,
6620            UnresolvedObjectName::Item(name),
6621        ) => name,
6622        (object_type, _) => {
6623            sql_bail!("{object_type} does not support RETAIN HISTORY")
6624        }
6625    };
6626    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6627        Some(entry) => {
6628            let full_name = scx.catalog.resolve_full_name(entry.name());
6629            let item_type = entry.item_type();
6630
6631            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6632            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6633                return Err(PlanError::AlterViewOnMaterializedView(
6634                    full_name.to_string(),
6635                ));
6636            } else if object_type == ObjectType::View {
6637                sql_bail!("{object_type} does not support RETAIN HISTORY")
6638            } else if object_type != item_type {
6639                sql_bail!(
6640                    "\"{}\" is a {} not a {}",
6641                    full_name,
6642                    entry.item_type(),
6643                    format!("{object_type}").to_lowercase()
6644                )
6645            }
6646
6647            // Save the original value so we can write it back down in the create_sql catalog item.
6648            let (value, lcw) = match &history {
6649                Some(WithOptionValue::RetainHistoryFor(value)) => {
6650                    let window = OptionalDuration::try_from_value(value.clone())?;
6651                    (Some(value.clone()), window.0)
6652                }
6653                // None is RESET, so use the default CW.
6654                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6655                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6656            };
6657            let window = plan_retain_history(scx, lcw)?;
6658
6659            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6660                id: entry.id(),
6661                value,
6662                window,
6663                object_type,
6664            }))
6665        }
6666        None => {
6667            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6668                name: name.to_ast_string_simple(),
6669                object_type,
6670            });
6671
6672            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6673        }
6674    }
6675}
6676
6677pub fn describe_alter_secret_options(
6678    _: &StatementContext,
6679    _: AlterSecretStatement<Aug>,
6680) -> Result<StatementDesc, PlanError> {
6681    Ok(StatementDesc::new(None))
6682}
6683
6684pub fn plan_alter_secret(
6685    scx: &mut StatementContext,
6686    stmt: AlterSecretStatement<Aug>,
6687) -> Result<Plan, PlanError> {
6688    let AlterSecretStatement {
6689        name,
6690        if_exists,
6691        value,
6692    } = stmt;
6693    let object_type = ObjectType::Secret;
6694    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6695        Some(entry) => entry.id(),
6696        None => {
6697            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6698                name: name.to_string(),
6699                object_type,
6700            });
6701
6702            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6703        }
6704    };
6705
6706    let secret_as = query::plan_secret_as(scx, value)?;
6707
6708    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6709}
6710
6711pub fn describe_alter_connection(
6712    _: &StatementContext,
6713    _: AlterConnectionStatement<Aug>,
6714) -> Result<StatementDesc, PlanError> {
6715    Ok(StatementDesc::new(None))
6716}
6717
6718generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6719
6720pub fn plan_alter_connection(
6721    scx: &StatementContext,
6722    stmt: AlterConnectionStatement<Aug>,
6723) -> Result<Plan, PlanError> {
6724    let AlterConnectionStatement {
6725        name,
6726        if_exists,
6727        actions,
6728        with_options,
6729    } = stmt;
6730    let conn_name = normalize::unresolved_item_name(name)?;
6731    let entry = match scx.catalog.resolve_item(&conn_name) {
6732        Ok(entry) => entry,
6733        Err(_) if if_exists => {
6734            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6735                name: conn_name.to_string(),
6736                object_type: ObjectType::Sink,
6737            });
6738
6739            return Ok(Plan::AlterNoop(AlterNoopPlan {
6740                object_type: ObjectType::Connection,
6741            }));
6742        }
6743        Err(e) => return Err(e.into()),
6744    };
6745
6746    let connection = entry.connection()?;
6747
6748    if actions
6749        .iter()
6750        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6751    {
6752        if actions.len() > 1 {
6753            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6754        }
6755
6756        if !with_options.is_empty() {
6757            sql_bail!(
6758                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6759                with_options
6760                    .iter()
6761                    .map(|o| o.to_ast_string_simple())
6762                    .join(", ")
6763            );
6764        }
6765
6766        if !matches!(connection, Connection::Ssh(_)) {
6767            sql_bail!(
6768                "{} is not an SSH connection",
6769                scx.catalog.resolve_full_name(entry.name())
6770            )
6771        }
6772
6773        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6774            id: entry.id(),
6775            action: crate::plan::AlterConnectionAction::RotateKeys,
6776        }));
6777    }
6778
6779    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6780    if options.validate.is_some() {
6781        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6782    }
6783
6784    let validate = match options.validate {
6785        Some(val) => val,
6786        None => {
6787            scx.catalog
6788                .system_vars()
6789                .enable_default_connection_validation()
6790                && connection.validate_by_default()
6791        }
6792    };
6793
6794    let connection_type = match connection {
6795        Connection::Aws(_) => CreateConnectionType::Aws,
6796        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6797        Connection::Kafka(_) => CreateConnectionType::Kafka,
6798        Connection::Csr(_) => CreateConnectionType::Csr,
6799        Connection::Postgres(_) => CreateConnectionType::Postgres,
6800        Connection::Ssh(_) => CreateConnectionType::Ssh,
6801        Connection::MySql(_) => CreateConnectionType::MySql,
6802        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6803        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6804    };
6805
6806    // Collect all options irrespective of action taken on them.
6807    let specified_options: BTreeSet<_> = actions
6808        .iter()
6809        .map(|action: &AlterConnectionAction<Aug>| match action {
6810            AlterConnectionAction::SetOption(option) => option.name.clone(),
6811            AlterConnectionAction::DropOption(name) => name.clone(),
6812            AlterConnectionAction::RotateKeys => unreachable!(),
6813        })
6814        .collect();
6815
6816    for invalid in INALTERABLE_OPTIONS {
6817        if specified_options.contains(invalid) {
6818            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6819        }
6820    }
6821
6822    connection::validate_options_per_connection_type(connection_type, specified_options)?;
6823
6824    // Partition operations into set and drop
6825    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6826        actions.into_iter().partition_map(|action| match action {
6827            AlterConnectionAction::SetOption(option) => Either::Left(option),
6828            AlterConnectionAction::DropOption(name) => Either::Right(name),
6829            AlterConnectionAction::RotateKeys => unreachable!(),
6830        });
6831
6832    let set_options: BTreeMap<_, _> = set_options_vec
6833        .clone()
6834        .into_iter()
6835        .map(|option| (option.name, option.value))
6836        .collect();
6837
6838    // Type check values + avoid duplicates; we don't want to e.g. let users
6839    // drop and set the same option in the same statement, so treating drops as
6840    // sets here is fine.
6841    let connection_options_extracted =
6842        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6843
6844    let duplicates: Vec<_> = connection_options_extracted
6845        .seen
6846        .intersection(&drop_options)
6847        .collect();
6848
6849    if !duplicates.is_empty() {
6850        sql_bail!(
6851            "cannot both SET and DROP/RESET options {}",
6852            duplicates
6853                .iter()
6854                .map(|option| option.to_string())
6855                .join(", ")
6856        )
6857    }
6858
6859    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6860        let set_options_count = mutually_exclusive_options
6861            .iter()
6862            .filter(|o| set_options.contains_key(o))
6863            .count();
6864        let drop_options_count = mutually_exclusive_options
6865            .iter()
6866            .filter(|o| drop_options.contains(o))
6867            .count();
6868
6869        // Disallow setting _and_ resetting mutually exclusive options
6870        if set_options_count > 0 && drop_options_count > 0 {
6871            sql_bail!(
6872                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6873                connection_type,
6874                mutually_exclusive_options
6875                    .iter()
6876                    .map(|option| option.to_string())
6877                    .join(", ")
6878            )
6879        }
6880
6881        // If any option is either set or dropped, ensure all mutually exclusive
6882        // options are dropped. We do this "behind the scenes", even though we
6883        // disallow users from performing the same action because this is the
6884        // mechanism by which we overwrite values elsewhere in the code.
6885        if set_options_count > 0 || drop_options_count > 0 {
6886            drop_options.extend(mutually_exclusive_options.iter().cloned());
6887        }
6888
6889        // n.b. if mutually exclusive options are set, those will error when we
6890        // try to replan the connection.
6891    }
6892
6893    Ok(Plan::AlterConnection(AlterConnectionPlan {
6894        id: entry.id(),
6895        action: crate::plan::AlterConnectionAction::AlterOptions {
6896            set_options,
6897            drop_options,
6898            validate,
6899        },
6900    }))
6901}
6902
6903pub fn describe_alter_sink(
6904    _: &StatementContext,
6905    _: AlterSinkStatement<Aug>,
6906) -> Result<StatementDesc, PlanError> {
6907    Ok(StatementDesc::new(None))
6908}
6909
6910pub fn plan_alter_sink(
6911    scx: &mut StatementContext,
6912    stmt: AlterSinkStatement<Aug>,
6913) -> Result<Plan, PlanError> {
6914    let AlterSinkStatement {
6915        sink_name,
6916        if_exists,
6917        action,
6918    } = stmt;
6919
6920    let object_type = ObjectType::Sink;
6921    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6922
6923    let Some(item) = item else {
6924        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6925            name: sink_name.to_string(),
6926            object_type,
6927        });
6928
6929        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6930    };
6931    // Always ALTER objects from their latest version.
6932    let item = item.at_version(RelationVersionSelector::Latest);
6933
6934    match action {
6935        AlterSinkAction::ChangeRelation(new_from) => {
6936            // First we reconstruct the original CREATE SINK statement
6937            let create_sql = item.create_sql();
6938            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6939            let [stmt]: [StatementParseResult; 1] = stmts
6940                .try_into()
6941                .expect("create sql of sink was not exactly one statement");
6942            let Statement::CreateSink(mut stmt) = stmt.ast else {
6943                unreachable!("invalid create SQL for sink item");
6944            };
6945
6946            // And then we find the existing version of the sink and increase it by one
6947            let cur_version = stmt
6948                .with_options
6949                .extract_if(.., |o| o.name == CreateSinkOptionName::Version)
6950                .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
6951                .max()
6952                .unwrap_or(0);
6953            let new_version = cur_version + 1;
6954            stmt.with_options.push(CreateSinkOption {
6955                name: CreateSinkOptionName::Version,
6956                value: Some(WithOptionValue::Value(Value::Number(
6957                    new_version.to_string(),
6958                ))),
6959            });
6960
6961            // Then resolve and swap the resolved from relation to the new one
6962            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
6963            stmt.from = new_from;
6964
6965            // Finally re-plan the modified create sink statement to verify the new configuration is valid
6966            let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
6967                unreachable!("invalid plan for CREATE SINK statement");
6968            };
6969
6970            Ok(Plan::AlterSink(AlterSinkPlan {
6971                item_id: item.id(),
6972                global_id: item.global_id(),
6973                sink: plan.sink,
6974                with_snapshot: plan.with_snapshot,
6975                in_cluster: plan.in_cluster,
6976            }))
6977        }
6978        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
6979        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
6980    }
6981}
6982
6983pub fn describe_alter_source(
6984    _: &StatementContext,
6985    _: AlterSourceStatement<Aug>,
6986) -> Result<StatementDesc, PlanError> {
6987    // TODO: put the options here, right?
6988    Ok(StatementDesc::new(None))
6989}
6990
6991generate_extracted_config!(
6992    AlterSourceAddSubsourceOption,
6993    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
6994    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
6995    (Details, String)
6996);
6997
6998pub fn plan_alter_source(
6999    scx: &mut StatementContext,
7000    stmt: AlterSourceStatement<Aug>,
7001) -> Result<Plan, PlanError> {
7002    let AlterSourceStatement {
7003        source_name,
7004        if_exists,
7005        action,
7006    } = stmt;
7007    let object_type = ObjectType::Source;
7008
7009    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7010        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7011            name: source_name.to_string(),
7012            object_type,
7013        });
7014
7015        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7016    }
7017
7018    match action {
7019        AlterSourceAction::SetOptions(options) => {
7020            let mut options = options.into_iter();
7021            let option = options.next().unwrap();
7022            if option.name == CreateSourceOptionName::RetainHistory {
7023                if options.next().is_some() {
7024                    sql_bail!("RETAIN HISTORY must be only option");
7025                }
7026                return alter_retain_history(
7027                    scx,
7028                    object_type,
7029                    if_exists,
7030                    UnresolvedObjectName::Item(source_name),
7031                    option.value,
7032                );
7033            }
7034            // n.b we use this statement in purification in a way that cannot be
7035            // planned directly.
7036            sql_bail!(
7037                "Cannot modify the {} of a SOURCE.",
7038                option.name.to_ast_string_simple()
7039            );
7040        }
7041        AlterSourceAction::ResetOptions(reset) => {
7042            let mut options = reset.into_iter();
7043            let option = options.next().unwrap();
7044            if option == CreateSourceOptionName::RetainHistory {
7045                if options.next().is_some() {
7046                    sql_bail!("RETAIN HISTORY must be only option");
7047                }
7048                return alter_retain_history(
7049                    scx,
7050                    object_type,
7051                    if_exists,
7052                    UnresolvedObjectName::Item(source_name),
7053                    None,
7054                );
7055            }
7056            sql_bail!(
7057                "Cannot modify the {} of a SOURCE.",
7058                option.to_ast_string_simple()
7059            );
7060        }
7061        AlterSourceAction::DropSubsources { .. } => {
7062            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7063        }
7064        AlterSourceAction::AddSubsources { .. } => {
7065            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7066        }
7067        AlterSourceAction::RefreshReferences => {
7068            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7069        }
7070    };
7071}
7072
7073pub fn describe_alter_system_set(
7074    _: &StatementContext,
7075    _: AlterSystemSetStatement,
7076) -> Result<StatementDesc, PlanError> {
7077    Ok(StatementDesc::new(None))
7078}
7079
7080pub fn plan_alter_system_set(
7081    _: &StatementContext,
7082    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7083) -> Result<Plan, PlanError> {
7084    let name = name.to_string();
7085    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7086        name,
7087        value: scl::plan_set_variable_to(to)?,
7088    }))
7089}
7090
7091pub fn describe_alter_system_reset(
7092    _: &StatementContext,
7093    _: AlterSystemResetStatement,
7094) -> Result<StatementDesc, PlanError> {
7095    Ok(StatementDesc::new(None))
7096}
7097
7098pub fn plan_alter_system_reset(
7099    _: &StatementContext,
7100    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7101) -> Result<Plan, PlanError> {
7102    let name = name.to_string();
7103    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7104}
7105
7106pub fn describe_alter_system_reset_all(
7107    _: &StatementContext,
7108    _: AlterSystemResetAllStatement,
7109) -> Result<StatementDesc, PlanError> {
7110    Ok(StatementDesc::new(None))
7111}
7112
7113pub fn plan_alter_system_reset_all(
7114    _: &StatementContext,
7115    _: AlterSystemResetAllStatement,
7116) -> Result<Plan, PlanError> {
7117    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7118}
7119
7120pub fn describe_alter_role(
7121    _: &StatementContext,
7122    _: AlterRoleStatement<Aug>,
7123) -> Result<StatementDesc, PlanError> {
7124    Ok(StatementDesc::new(None))
7125}
7126
7127pub fn plan_alter_role(
7128    _scx: &StatementContext,
7129    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7130) -> Result<Plan, PlanError> {
7131    let option = match option {
7132        AlterRoleOption::Attributes(attrs) => {
7133            let attrs = plan_role_attributes(attrs)?;
7134            PlannedAlterRoleOption::Attributes(attrs)
7135        }
7136        AlterRoleOption::Variable(variable) => {
7137            let var = plan_role_variable(variable)?;
7138            PlannedAlterRoleOption::Variable(var)
7139        }
7140    };
7141
7142    Ok(Plan::AlterRole(AlterRolePlan {
7143        id: name.id,
7144        name: name.name,
7145        option,
7146    }))
7147}
7148
7149pub fn describe_alter_table_add_column(
7150    _: &StatementContext,
7151    _: AlterTableAddColumnStatement<Aug>,
7152) -> Result<StatementDesc, PlanError> {
7153    Ok(StatementDesc::new(None))
7154}
7155
7156pub fn plan_alter_table_add_column(
7157    scx: &StatementContext,
7158    stmt: AlterTableAddColumnStatement<Aug>,
7159) -> Result<Plan, PlanError> {
7160    let AlterTableAddColumnStatement {
7161        if_exists,
7162        name,
7163        if_col_not_exist,
7164        column_name,
7165        data_type,
7166    } = stmt;
7167    let object_type = ObjectType::Table;
7168
7169    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7170
7171    let (relation_id, item_name, desc) =
7172        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7173            Some(item) => {
7174                // Always add columns to the latest version of the item.
7175                let item_name = scx.catalog.resolve_full_name(item.name());
7176                let item = item.at_version(RelationVersionSelector::Latest);
7177                let desc = item.desc(&item_name)?.into_owned();
7178                (item.id(), item_name, desc)
7179            }
7180            None => {
7181                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7182                    name: name.to_ast_string_simple(),
7183                    object_type,
7184                });
7185                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7186            }
7187        };
7188
7189    let column_name = ColumnName::from(column_name.as_str());
7190    if desc.get_by_name(&column_name).is_some() {
7191        if if_col_not_exist {
7192            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7193                column_name: column_name.to_string(),
7194                object_name: item_name.item,
7195            });
7196            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7197        } else {
7198            return Err(PlanError::ColumnAlreadyExists {
7199                column_name,
7200                object_name: item_name.item,
7201            });
7202        }
7203    }
7204
7205    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7206    // TODO(alter_table): Support non-nullable columns with default values.
7207    let column_type = scalar_type.nullable(true);
7208    // "unresolve" our data type so we can later update the persisted create_sql.
7209    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7210
7211    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7212        relation_id,
7213        column_name,
7214        column_type,
7215        raw_sql_type,
7216    }))
7217}
7218
7219pub fn describe_comment(
7220    _: &StatementContext,
7221    _: CommentStatement<Aug>,
7222) -> Result<StatementDesc, PlanError> {
7223    Ok(StatementDesc::new(None))
7224}
7225
7226pub fn plan_comment(
7227    scx: &mut StatementContext,
7228    stmt: CommentStatement<Aug>,
7229) -> Result<Plan, PlanError> {
7230    const MAX_COMMENT_LENGTH: usize = 1024;
7231
7232    let CommentStatement { object, comment } = stmt;
7233
7234    // TODO(parkmycar): Make max comment length configurable.
7235    if let Some(c) = &comment {
7236        if c.len() > 1024 {
7237            return Err(PlanError::CommentTooLong {
7238                length: c.len(),
7239                max_size: MAX_COMMENT_LENGTH,
7240            });
7241        }
7242    }
7243
7244    let (object_id, column_pos) = match &object {
7245        com_ty @ CommentObjectType::Table { name }
7246        | com_ty @ CommentObjectType::View { name }
7247        | com_ty @ CommentObjectType::MaterializedView { name }
7248        | com_ty @ CommentObjectType::Index { name }
7249        | com_ty @ CommentObjectType::Func { name }
7250        | com_ty @ CommentObjectType::Connection { name }
7251        | com_ty @ CommentObjectType::Source { name }
7252        | com_ty @ CommentObjectType::Sink { name }
7253        | com_ty @ CommentObjectType::Secret { name }
7254        | com_ty @ CommentObjectType::ContinualTask { name } => {
7255            let item = scx.get_item_by_resolved_name(name)?;
7256            match (com_ty, item.item_type()) {
7257                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7258                    (CommentObjectId::Table(item.id()), None)
7259                }
7260                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7261                    (CommentObjectId::View(item.id()), None)
7262                }
7263                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7264                    (CommentObjectId::MaterializedView(item.id()), None)
7265                }
7266                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7267                    (CommentObjectId::Index(item.id()), None)
7268                }
7269                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7270                    (CommentObjectId::Func(item.id()), None)
7271                }
7272                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7273                    (CommentObjectId::Connection(item.id()), None)
7274                }
7275                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7276                    (CommentObjectId::Source(item.id()), None)
7277                }
7278                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7279                    (CommentObjectId::Sink(item.id()), None)
7280                }
7281                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7282                    (CommentObjectId::Secret(item.id()), None)
7283                }
7284                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7285                    (CommentObjectId::ContinualTask(item.id()), None)
7286                }
7287                (com_ty, cat_ty) => {
7288                    let expected_type = match com_ty {
7289                        CommentObjectType::Table { .. } => ObjectType::Table,
7290                        CommentObjectType::View { .. } => ObjectType::View,
7291                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7292                        CommentObjectType::Index { .. } => ObjectType::Index,
7293                        CommentObjectType::Func { .. } => ObjectType::Func,
7294                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7295                        CommentObjectType::Source { .. } => ObjectType::Source,
7296                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7297                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7298                        _ => unreachable!("these are the only types we match on"),
7299                    };
7300
7301                    return Err(PlanError::InvalidObjectType {
7302                        expected_type: SystemObjectType::Object(expected_type),
7303                        actual_type: SystemObjectType::Object(cat_ty.into()),
7304                        object_name: item.name().item.clone(),
7305                    });
7306                }
7307            }
7308        }
7309        CommentObjectType::Type { ty } => match ty {
7310            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7311                sql_bail!("cannot comment on anonymous list or map type");
7312            }
7313            ResolvedDataType::Named { id, modifiers, .. } => {
7314                if !modifiers.is_empty() {
7315                    sql_bail!("cannot comment on type with modifiers");
7316                }
7317                (CommentObjectId::Type(*id), None)
7318            }
7319            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7320        },
7321        CommentObjectType::Column { name } => {
7322            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7323            match item.item_type() {
7324                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7325                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7326                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7327                CatalogItemType::MaterializedView => {
7328                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7329                }
7330                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7331                r => {
7332                    return Err(PlanError::Unsupported {
7333                        feature: format!("Specifying comments on a column of {r}"),
7334                        discussion_no: None,
7335                    });
7336                }
7337            }
7338        }
7339        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7340        CommentObjectType::Database { name } => {
7341            (CommentObjectId::Database(*name.database_id()), None)
7342        }
7343        CommentObjectType::Schema { name } => (
7344            CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7345            None,
7346        ),
7347        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7348        CommentObjectType::ClusterReplica { name } => {
7349            let replica = scx.catalog.resolve_cluster_replica(name)?;
7350            (
7351                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7352                None,
7353            )
7354        }
7355        CommentObjectType::NetworkPolicy { name } => {
7356            (CommentObjectId::NetworkPolicy(name.id), None)
7357        }
7358    };
7359
7360    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7361    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7362    // it's the easiest place to raise an error.
7363    //
7364    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7365    if let Some(p) = column_pos {
7366        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7367            max_num_columns: MAX_NUM_COLUMNS,
7368            req_num_columns: p,
7369        })?;
7370    }
7371
7372    Ok(Plan::Comment(CommentPlan {
7373        object_id,
7374        sub_component: column_pos,
7375        comment,
7376    }))
7377}
7378
7379pub(crate) fn resolve_cluster<'a>(
7380    scx: &'a StatementContext,
7381    name: &'a Ident,
7382    if_exists: bool,
7383) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7384    match scx.resolve_cluster(Some(name)) {
7385        Ok(cluster) => Ok(Some(cluster)),
7386        Err(_) if if_exists => Ok(None),
7387        Err(e) => Err(e),
7388    }
7389}
7390
7391pub(crate) fn resolve_cluster_replica<'a>(
7392    scx: &'a StatementContext,
7393    name: &QualifiedReplica,
7394    if_exists: bool,
7395) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7396    match scx.resolve_cluster(Some(&name.cluster)) {
7397        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7398            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7399            None if if_exists => Ok(None),
7400            None => Err(sql_err!(
7401                "CLUSTER {} has no CLUSTER REPLICA named {}",
7402                cluster.name(),
7403                name.replica.as_str().quoted(),
7404            )),
7405        },
7406        Err(_) if if_exists => Ok(None),
7407        Err(e) => Err(e),
7408    }
7409}
7410
7411pub(crate) fn resolve_database<'a>(
7412    scx: &'a StatementContext,
7413    name: &'a UnresolvedDatabaseName,
7414    if_exists: bool,
7415) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7416    match scx.resolve_database(name) {
7417        Ok(database) => Ok(Some(database)),
7418        Err(_) if if_exists => Ok(None),
7419        Err(e) => Err(e),
7420    }
7421}
7422
7423pub(crate) fn resolve_schema<'a>(
7424    scx: &'a StatementContext,
7425    name: UnresolvedSchemaName,
7426    if_exists: bool,
7427) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7428    match scx.resolve_schema(name) {
7429        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7430        Err(_) if if_exists => Ok(None),
7431        Err(e) => Err(e),
7432    }
7433}
7434
7435pub(crate) fn resolve_network_policy<'a>(
7436    scx: &'a StatementContext,
7437    name: Ident,
7438    if_exists: bool,
7439) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7440    match scx.catalog.resolve_network_policy(&name.to_string()) {
7441        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7442            id: policy.id(),
7443            name: policy.name().to_string(),
7444        })),
7445        Err(_) if if_exists => Ok(None),
7446        Err(e) => Err(e.into()),
7447    }
7448}
7449
7450pub(crate) fn resolve_item_or_type<'a>(
7451    scx: &'a StatementContext,
7452    object_type: ObjectType,
7453    name: UnresolvedItemName,
7454    if_exists: bool,
7455) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7456    let name = normalize::unresolved_item_name(name)?;
7457    let catalog_item = match object_type {
7458        ObjectType::Type => scx.catalog.resolve_type(&name),
7459        _ => scx.catalog.resolve_item(&name),
7460    };
7461
7462    match catalog_item {
7463        Ok(item) => {
7464            let is_type = ObjectType::from(item.item_type());
7465            if object_type == is_type {
7466                Ok(Some(item))
7467            } else {
7468                Err(PlanError::MismatchedObjectType {
7469                    name: scx.catalog.minimal_qualification(item.name()),
7470                    is_type,
7471                    expected_type: object_type,
7472                })
7473            }
7474        }
7475        Err(_) if if_exists => Ok(None),
7476        Err(e) => Err(e.into()),
7477    }
7478}
7479
7480/// Returns an error if the given cluster is a managed cluster
7481fn ensure_cluster_is_not_managed(
7482    scx: &StatementContext,
7483    cluster_id: ClusterId,
7484) -> Result<(), PlanError> {
7485    let cluster = scx.catalog.get_cluster(cluster_id);
7486    if cluster.is_managed() {
7487        Err(PlanError::ManagedCluster {
7488            cluster_name: cluster.name().to_string(),
7489        })
7490    } else {
7491        Ok(())
7492    }
7493}