mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_ore::vec::VecExt;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::adt::interval::Interval;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::network_policy_id::NetworkPolicyId;
38use mz_repr::optimize::OptimizerFeatureOverrides;
39use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, ColumnName, ColumnType, RelationDesc, RelationType, RelationVersion,
43    RelationVersionSelector, ScalarType, Timestamp, VersionedRelationDesc, preserves_order,
44    strconv,
45};
46use mz_sql_parser::ast::{
47    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
48    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
49    AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
50    AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
51    AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
52    AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
53    AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
54    AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
55    ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
56    ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
57    ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
58    ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
59    ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
60    CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
61    CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
62    CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
63    CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
64    CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
65    CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
66    CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
67    CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
68    CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
69    CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
70    CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
71    CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
72    DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
91    StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::SqlServerSourceExportDetails;
115use mz_storage_types::sources::{
116    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
117    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
118    SourceExportDetails, SourceExportStatementDetails, SqlServerSource, SqlServerSourceExtras,
119    Timeline,
120};
121use prost::Message;
122
123use crate::ast::display::AstDisplay;
124use crate::catalog::{
125    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
126    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
127};
128use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
129use crate::names::{
130    Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
131    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
132    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
133};
134use crate::normalize::{self, ident};
135use crate::plan::error::PlanError;
136use crate::plan::query::{
137    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
138    scalar_type_from_sql,
139};
140use crate::plan::scope::Scope;
141use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
142use crate::plan::statement::{StatementContext, StatementDesc, scl};
143use crate::plan::typeconv::CastContext;
144use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
145use crate::plan::{
146    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
147    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
148    AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
149    AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
150    AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
151    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
152    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
153    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
154    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
155    CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
156    CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
157    CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
158    Ingestion, MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction,
159    NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice, PolicyAddress, QueryContext,
160    ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View,
161    WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal,
162    plan_utils, query, transform_ast,
163};
164use crate::session::vars::{
165    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
166    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
167};
168use crate::{names, parse};
169
170mod connection;
171
172// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
173//
174// The real max is probably higher than this, but it's easier to relax a constraint than make it
175// more strict.
176const MAX_NUM_COLUMNS: usize = 256;
177
178static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
179    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
180
181/// Given a relation desc and a column list, checks that:
182/// - the column list is a prefix of the desc;
183/// - all the listed columns are types that have meaningful Persist-level ordering.
184fn check_partition_by(desc: &RelationDesc, partition_by: Vec<Ident>) -> Result<(), PlanError> {
185    for (idx, ((desc_name, desc_type), partition_name)) in
186        desc.iter().zip(partition_by.into_iter()).enumerate()
187    {
188        let partition_name = normalize::column_name(partition_name);
189        if *desc_name != partition_name {
190            sql_bail!(
191                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
192            );
193        }
194        if !preserves_order(&desc_type.scalar_type) {
195            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
196        }
197    }
198    Ok(())
199}
200
201pub fn describe_create_database(
202    _: &StatementContext,
203    _: CreateDatabaseStatement,
204) -> Result<StatementDesc, PlanError> {
205    Ok(StatementDesc::new(None))
206}
207
208pub fn plan_create_database(
209    _: &StatementContext,
210    CreateDatabaseStatement {
211        name,
212        if_not_exists,
213    }: CreateDatabaseStatement,
214) -> Result<Plan, PlanError> {
215    Ok(Plan::CreateDatabase(CreateDatabasePlan {
216        name: normalize::ident(name.0),
217        if_not_exists,
218    }))
219}
220
221pub fn describe_create_schema(
222    _: &StatementContext,
223    _: CreateSchemaStatement,
224) -> Result<StatementDesc, PlanError> {
225    Ok(StatementDesc::new(None))
226}
227
228pub fn plan_create_schema(
229    scx: &StatementContext,
230    CreateSchemaStatement {
231        mut name,
232        if_not_exists,
233    }: CreateSchemaStatement,
234) -> Result<Plan, PlanError> {
235    if name.0.len() > 2 {
236        sql_bail!("schema name {} has more than two components", name);
237    }
238    let schema_name = normalize::ident(
239        name.0
240            .pop()
241            .expect("names always have at least one component"),
242    );
243    let database_spec = match name.0.pop() {
244        None => match scx.catalog.active_database() {
245            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
246            None => sql_bail!("no database specified and no active database"),
247        },
248        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
249            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
250            Err(_) => sql_bail!("invalid database {}", n.as_str()),
251        },
252    };
253    Ok(Plan::CreateSchema(CreateSchemaPlan {
254        database_spec,
255        schema_name,
256        if_not_exists,
257    }))
258}
259
260pub fn describe_create_table(
261    _: &StatementContext,
262    _: CreateTableStatement<Aug>,
263) -> Result<StatementDesc, PlanError> {
264    Ok(StatementDesc::new(None))
265}
266
267pub fn plan_create_table(
268    scx: &StatementContext,
269    stmt: CreateTableStatement<Aug>,
270) -> Result<Plan, PlanError> {
271    let CreateTableStatement {
272        name,
273        columns,
274        constraints,
275        if_not_exists,
276        temporary,
277        with_options,
278    } = &stmt;
279
280    let names: Vec<_> = columns
281        .iter()
282        .filter(|c| {
283            // This set of `names` is used to create the initial RelationDesc.
284            // Columns that have been added at later versions of the table will
285            // get added further below.
286            let is_versioned = c
287                .options
288                .iter()
289                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
290            !is_versioned
291        })
292        .map(|c| normalize::column_name(c.name.clone()))
293        .collect();
294
295    if let Some(dup) = names.iter().duplicates().next() {
296        sql_bail!("column {} specified more than once", dup.quoted());
297    }
298
299    // Build initial relation type that handles declared data types
300    // and NOT NULL constraints.
301    let mut column_types = Vec::with_capacity(columns.len());
302    let mut defaults = Vec::with_capacity(columns.len());
303    let mut changes = BTreeMap::new();
304    let mut keys = Vec::new();
305
306    for (i, c) in columns.into_iter().enumerate() {
307        let aug_data_type = &c.data_type;
308        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
309        let mut nullable = true;
310        let mut default = Expr::null();
311        let mut versioned = false;
312        for option in &c.options {
313            match &option.option {
314                ColumnOption::NotNull => nullable = false,
315                ColumnOption::Default(expr) => {
316                    // Ensure expression can be planned and yields the correct
317                    // type.
318                    let mut expr = expr.clone();
319                    transform_ast::transform(scx, &mut expr)?;
320                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
321                    default = expr.clone();
322                }
323                ColumnOption::Unique { is_primary } => {
324                    keys.push(vec![i]);
325                    if *is_primary {
326                        nullable = false;
327                    }
328                }
329                ColumnOption::Versioned { action, version } => {
330                    let version = RelationVersion::from(*version);
331                    versioned = true;
332
333                    let name = normalize::column_name(c.name.clone());
334                    let typ = ty.clone().nullable(nullable);
335
336                    changes.insert(version, (action.clone(), name, typ));
337                }
338                other => {
339                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
340                }
341            }
342        }
343        // TODO(alter_table): This assumes all versioned columns are at the
344        // end. This will no longer be true when we support dropping columns.
345        if !versioned {
346            column_types.push(ty.nullable(nullable));
347        }
348        defaults.push(default);
349    }
350
351    let mut seen_primary = false;
352    'c: for constraint in constraints {
353        match constraint {
354            TableConstraint::Unique {
355                name: _,
356                columns,
357                is_primary,
358                nulls_not_distinct,
359            } => {
360                if seen_primary && *is_primary {
361                    sql_bail!(
362                        "multiple primary keys for table {} are not allowed",
363                        name.to_ast_string_stable()
364                    );
365                }
366                seen_primary = *is_primary || seen_primary;
367
368                let mut key = vec![];
369                for column in columns {
370                    let column = normalize::column_name(column.clone());
371                    match names.iter().position(|name| *name == column) {
372                        None => sql_bail!("unknown column in constraint: {}", column),
373                        Some(i) => {
374                            let nullable = &mut column_types[i].nullable;
375                            if *is_primary {
376                                if *nulls_not_distinct {
377                                    sql_bail!(
378                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
379                                    );
380                                }
381
382                                *nullable = false;
383                            } else if !(*nulls_not_distinct || !*nullable) {
384                                // Non-primary key unique constraints are only keys if all of their
385                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
386                                break 'c;
387                            }
388
389                            key.push(i);
390                        }
391                    }
392                }
393
394                if *is_primary {
395                    keys.insert(0, key);
396                } else {
397                    keys.push(key);
398                }
399            }
400            TableConstraint::ForeignKey { .. } => {
401                // Foreign key constraints are not presently enforced. We allow
402                // them with feature flags for sqllogictest's sake.
403                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
404            }
405            TableConstraint::Check { .. } => {
406                // Check constraints are not presently enforced. We allow them
407                // with feature flags for sqllogictest's sake.
408                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
409            }
410        }
411    }
412
413    if !keys.is_empty() {
414        // Unique constraints are not presently enforced. We allow them with feature flags for
415        // sqllogictest's sake.
416        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
417    }
418
419    let typ = RelationType::new(column_types).with_keys(keys);
420
421    let temporary = *temporary;
422    let name = if temporary {
423        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
424    } else {
425        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
426    };
427
428    // Check for an object in the catalog with this same name
429    let full_name = scx.catalog.resolve_full_name(&name);
430    let partial_name = PartialItemName::from(full_name.clone());
431    // For PostgreSQL compatibility, we need to prevent creating tables when
432    // there is an existing object *or* type of the same name.
433    if let (false, Ok(item)) = (
434        if_not_exists,
435        scx.catalog.resolve_item_or_type(&partial_name),
436    ) {
437        return Err(PlanError::ItemAlreadyExists {
438            name: full_name.to_string(),
439            item_type: item.item_type(),
440        });
441    }
442
443    let desc = RelationDesc::new(typ, names);
444    let mut desc = VersionedRelationDesc::new(desc);
445    for (version, (_action, name, typ)) in changes.into_iter() {
446        let new_version = desc.add_column(name, typ);
447        if version != new_version {
448            return Err(PlanError::InvalidTable {
449                name: full_name.item,
450            });
451        }
452    }
453
454    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
455
456    // Table options should only consider the original columns, since those
457    // were the only ones in scope when the table was created.
458    //
459    // TODO(alter_table): Will need to reconsider this when we support ALTERing
460    // the PARTITION BY columns.
461    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
462    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
463
464    let compaction_window = options.iter().find_map(|o| {
465        #[allow(irrefutable_let_patterns)]
466        if let crate::plan::TableOption::RetainHistory(lcw) = o {
467            Some(lcw.clone())
468        } else {
469            None
470        }
471    });
472
473    let table = Table {
474        create_sql,
475        desc,
476        temporary,
477        compaction_window,
478        data_source: TableDataSource::TableWrites { defaults },
479    };
480    Ok(Plan::CreateTable(CreateTablePlan {
481        name,
482        table,
483        if_not_exists: *if_not_exists,
484    }))
485}
486
487pub fn describe_create_table_from_source(
488    _: &StatementContext,
489    _: CreateTableFromSourceStatement<Aug>,
490) -> Result<StatementDesc, PlanError> {
491    Ok(StatementDesc::new(None))
492}
493
494pub fn describe_create_webhook_source(
495    _: &StatementContext,
496    _: CreateWebhookSourceStatement<Aug>,
497) -> Result<StatementDesc, PlanError> {
498    Ok(StatementDesc::new(None))
499}
500
501pub fn describe_create_source(
502    _: &StatementContext,
503    _: CreateSourceStatement<Aug>,
504) -> Result<StatementDesc, PlanError> {
505    Ok(StatementDesc::new(None))
506}
507
508pub fn describe_create_subsource(
509    _: &StatementContext,
510    _: CreateSubsourceStatement<Aug>,
511) -> Result<StatementDesc, PlanError> {
512    Ok(StatementDesc::new(None))
513}
514
515generate_extracted_config!(
516    CreateSourceOption,
517    (TimestampInterval, Duration),
518    (RetainHistory, OptionalDuration)
519);
520
521generate_extracted_config!(
522    PgConfigOption,
523    (Details, String),
524    (Publication, String),
525    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
526);
527
528generate_extracted_config!(
529    MySqlConfigOption,
530    (Details, String),
531    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
532    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
533);
534
535generate_extracted_config!(
536    SqlServerConfigOption,
537    (Details, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542pub fn plan_create_webhook_source(
543    scx: &StatementContext,
544    mut stmt: CreateWebhookSourceStatement<Aug>,
545) -> Result<Plan, PlanError> {
546    if stmt.is_table {
547        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
548    }
549
550    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
551    // we plan to normalize when we canonicalize the create statement.
552    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
553    let enable_multi_replica_sources =
554        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
555    if !enable_multi_replica_sources {
556        if in_cluster.replica_ids().len() > 1 {
557            sql_bail!("cannot create webhook source in cluster with more than one replica")
558        }
559    }
560    let create_sql =
561        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
562
563    let CreateWebhookSourceStatement {
564        name,
565        if_not_exists,
566        body_format,
567        include_headers,
568        validate_using,
569        is_table,
570        // We resolved `in_cluster` above, so we want to ignore it here.
571        in_cluster: _,
572    } = stmt;
573
574    let validate_using = validate_using
575        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
576        .transpose()?;
577    if let Some(WebhookValidation { expression, .. }) = &validate_using {
578        // If the validation expression doesn't reference any part of the request, then we should
579        // return an error because it's almost definitely wrong.
580        if !expression.contains_column() {
581            return Err(PlanError::WebhookValidationDoesNotUseColumns);
582        }
583        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
584        // allow calls to `now()` because some webhook providers recommend rejecting requests that
585        // are older than a certain threshold.
586        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
587            return Err(PlanError::WebhookValidationNonDeterministic);
588        }
589    }
590
591    let body_format = match body_format {
592        Format::Bytes => WebhookBodyFormat::Bytes,
593        Format::Json { array } => WebhookBodyFormat::Json { array },
594        Format::Text => WebhookBodyFormat::Text,
595        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
596        ty => {
597            return Err(PlanError::Unsupported {
598                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
599                discussion_no: None,
600            });
601        }
602    };
603
604    let mut column_ty = vec![
605        // Always include the body of the request as the first column.
606        ColumnType {
607            scalar_type: ScalarType::from(body_format),
608            nullable: false,
609        },
610    ];
611    let mut column_names = vec!["body".to_string()];
612
613    let mut headers = WebhookHeaders::default();
614
615    // Include a `headers` column, possibly filtered.
616    if let Some(filters) = include_headers.column {
617        column_ty.push(ColumnType {
618            scalar_type: ScalarType::Map {
619                value_type: Box::new(ScalarType::String),
620                custom_id: None,
621            },
622            nullable: false,
623        });
624        column_names.push("headers".to_string());
625
626        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
627            filters.into_iter().partition_map(|filter| {
628                if filter.block {
629                    itertools::Either::Right(filter.header_name)
630                } else {
631                    itertools::Either::Left(filter.header_name)
632                }
633            });
634        headers.header_column = Some(WebhookHeaderFilters { allow, block });
635    }
636
637    // Map headers to specific columns.
638    for header in include_headers.mappings {
639        let scalar_type = header
640            .use_bytes
641            .then_some(ScalarType::Bytes)
642            .unwrap_or(ScalarType::String);
643        column_ty.push(ColumnType {
644            scalar_type,
645            nullable: true,
646        });
647        column_names.push(header.column_name.into_string());
648
649        let column_idx = column_ty.len() - 1;
650        // Double check we're consistent with column names.
651        assert_eq!(
652            column_idx,
653            column_names.len() - 1,
654            "header column names and types don't match"
655        );
656        headers
657            .mapped_headers
658            .insert(column_idx, (header.header_name, header.use_bytes));
659    }
660
661    // Validate our columns.
662    let mut unique_check = HashSet::with_capacity(column_names.len());
663    for name in &column_names {
664        if !unique_check.insert(name) {
665            return Err(PlanError::AmbiguousColumn(name.clone().into()));
666        }
667    }
668    if column_names.len() > MAX_NUM_COLUMNS {
669        return Err(PlanError::TooManyColumns {
670            max_num_columns: MAX_NUM_COLUMNS,
671            req_num_columns: column_names.len(),
672        });
673    }
674
675    let typ = RelationType::new(column_ty);
676    let desc = RelationDesc::new(typ, column_names);
677
678    // Check for an object in the catalog with this same name
679    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
680    let full_name = scx.catalog.resolve_full_name(&name);
681    let partial_name = PartialItemName::from(full_name.clone());
682    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
683        return Err(PlanError::ItemAlreadyExists {
684            name: full_name.to_string(),
685            item_type: item.item_type(),
686        });
687    }
688
689    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
690    // such, we always use a default of EpochMilliseconds.
691    let timeline = Timeline::EpochMilliseconds;
692
693    let plan = if is_table {
694        let data_source = DataSourceDesc::Webhook {
695            validate_using,
696            body_format,
697            headers,
698            cluster_id: Some(in_cluster.id()),
699        };
700        let data_source = TableDataSource::DataSource {
701            desc: data_source,
702            timeline,
703        };
704        Plan::CreateTable(CreateTablePlan {
705            name,
706            if_not_exists,
707            table: Table {
708                create_sql,
709                desc: VersionedRelationDesc::new(desc),
710                temporary: false,
711                compaction_window: None,
712                data_source,
713            },
714        })
715    } else {
716        let data_source = DataSourceDesc::Webhook {
717            validate_using,
718            body_format,
719            headers,
720            // Important: The cluster is set at the `Source` level.
721            cluster_id: None,
722        };
723        Plan::CreateSource(CreateSourcePlan {
724            name,
725            source: Source {
726                create_sql,
727                data_source,
728                desc,
729                compaction_window: None,
730            },
731            if_not_exists,
732            timeline,
733            in_cluster: Some(in_cluster.id()),
734        })
735    };
736
737    Ok(plan)
738}
739
740pub fn plan_create_source(
741    scx: &StatementContext,
742    mut stmt: CreateSourceStatement<Aug>,
743) -> Result<Plan, PlanError> {
744    let CreateSourceStatement {
745        name,
746        in_cluster: _,
747        col_names,
748        connection: source_connection,
749        envelope,
750        if_not_exists,
751        format,
752        key_constraint,
753        include_metadata,
754        with_options,
755        external_references: referenced_subsources,
756        progress_subsource,
757    } = &stmt;
758
759    mz_ore::soft_assert_or_log!(
760        referenced_subsources.is_none(),
761        "referenced subsources must be cleared in purification"
762    );
763
764    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
765        && scx.catalog.system_vars().force_source_table_syntax();
766
767    // If the new source table syntax is forced all the options related to the primary
768    // source output should be un-set.
769    if force_source_table_syntax {
770        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
771            Err(PlanError::UseTablesForSources(
772                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
773            ))?;
774        }
775    }
776
777    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
778
779    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
780        && include_metadata
781            .iter()
782            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
783    {
784        // TODO(guswynn): should this be `bail_unsupported!`?
785        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
786    }
787    if !matches!(
788        source_connection,
789        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
790    ) && !include_metadata.is_empty()
791    {
792        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
793    }
794
795    let external_connection = match source_connection {
796        CreateSourceConnection::Kafka {
797            connection: connection_name,
798            options,
799        } => {
800            let connection_item = scx.get_item_by_resolved_name(connection_name)?;
801            if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
802                sql_bail!(
803                    "{} is not a kafka connection",
804                    scx.catalog.resolve_full_name(connection_item.name())
805                )
806            }
807
808            let KafkaSourceConfigOptionExtracted {
809                group_id_prefix,
810                topic,
811                topic_metadata_refresh_interval,
812                start_timestamp: _, // purified into `start_offset`
813                start_offset,
814                seen: _,
815            }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
816
817            let topic = topic.expect("validated exists during purification");
818
819            let mut start_offsets = BTreeMap::new();
820            if let Some(offsets) = start_offset {
821                for (part, offset) in offsets.iter().enumerate() {
822                    if *offset < 0 {
823                        sql_bail!("START OFFSET must be a nonnegative integer");
824                    }
825                    start_offsets.insert(i32::try_from(part)?, *offset);
826                }
827            }
828
829            if !start_offsets.is_empty() && envelope.requires_all_input() {
830                sql_bail!("START OFFSET is not supported with ENVELOPE {}", envelope)
831            }
832
833            if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
834                // This is a librdkafka-enforced restriction that, if violated,
835                // would result in a runtime error for the source.
836                sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
837            }
838
839            if !include_metadata.is_empty()
840                && !matches!(
841                    envelope,
842                    ast::SourceEnvelope::Upsert { .. }
843                        | ast::SourceEnvelope::None
844                        | ast::SourceEnvelope::Debezium
845                )
846            {
847                // TODO(guswynn): should this be `bail_unsupported!`?
848                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
849            }
850
851            // This defines the metadata columns for the primary export of this kafka source,
852            // not any tables that are created from it.
853            // TODO: Remove this when we stop outputting to the primary export of a source.
854            let metadata_columns = include_metadata
855                .into_iter()
856                .flat_map(|item| match item {
857                    SourceIncludeMetadata::Timestamp { alias } => {
858                        let name = match alias {
859                            Some(name) => name.to_string(),
860                            None => "timestamp".to_owned(),
861                        };
862                        Some((name, KafkaMetadataKind::Timestamp))
863                    }
864                    SourceIncludeMetadata::Partition { alias } => {
865                        let name = match alias {
866                            Some(name) => name.to_string(),
867                            None => "partition".to_owned(),
868                        };
869                        Some((name, KafkaMetadataKind::Partition))
870                    }
871                    SourceIncludeMetadata::Offset { alias } => {
872                        let name = match alias {
873                            Some(name) => name.to_string(),
874                            None => "offset".to_owned(),
875                        };
876                        Some((name, KafkaMetadataKind::Offset))
877                    }
878                    SourceIncludeMetadata::Headers { alias } => {
879                        let name = match alias {
880                            Some(name) => name.to_string(),
881                            None => "headers".to_owned(),
882                        };
883                        Some((name, KafkaMetadataKind::Headers))
884                    }
885                    SourceIncludeMetadata::Header {
886                        alias,
887                        key,
888                        use_bytes,
889                    } => Some((
890                        alias.to_string(),
891                        KafkaMetadataKind::Header {
892                            key: key.clone(),
893                            use_bytes: *use_bytes,
894                        },
895                    )),
896                    SourceIncludeMetadata::Key { .. } => {
897                        // handled below
898                        None
899                    }
900                })
901                .collect();
902
903            let connection = KafkaSourceConnection::<ReferencedConnection> {
904                connection: connection_item.id(),
905                connection_id: connection_item.id(),
906                topic,
907                start_offsets,
908                group_id_prefix,
909                topic_metadata_refresh_interval,
910                metadata_columns,
911            };
912
913            GenericSourceConnection::Kafka(connection)
914        }
915        CreateSourceConnection::Postgres {
916            connection,
917            options,
918        }
919        | CreateSourceConnection::Yugabyte {
920            connection,
921            options,
922        } => {
923            let (source_flavor, flavor_name) = match source_connection {
924                CreateSourceConnection::Postgres { .. } => (PostgresFlavor::Vanilla, "PostgreSQL"),
925                CreateSourceConnection::Yugabyte { .. } => (PostgresFlavor::Yugabyte, "Yugabyte"),
926                _ => unreachable!(),
927            };
928
929            let connection_item = scx.get_item_by_resolved_name(connection)?;
930            let connection_mismatch = match connection_item.connection()? {
931                Connection::Postgres(conn) => source_flavor != conn.flavor,
932                _ => true,
933            };
934            if connection_mismatch {
935                sql_bail!(
936                    "{} is not a {flavor_name} connection",
937                    scx.catalog.resolve_full_name(connection_item.name())
938                )
939            }
940
941            let PgConfigOptionExtracted {
942                details,
943                publication,
944                // text columns are already part of the source-exports and are only included
945                // in these options for round-tripping of a `CREATE SOURCE` statement. This should
946                // be removed once we drop support for implicitly created subsources.
947                text_columns: _,
948                seen: _,
949            } = options.clone().try_into()?;
950
951            let details = details
952                .as_ref()
953                .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
954            let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
955            let details = ProtoPostgresSourcePublicationDetails::decode(&*details)
956                .map_err(|e| sql_err!("{}", e))?;
957
958            let publication_details = PostgresSourcePublicationDetails::from_proto(details)
959                .map_err(|e| sql_err!("{}", e))?;
960
961            let connection =
962                GenericSourceConnection::<ReferencedConnection>::from(PostgresSourceConnection {
963                    connection: connection_item.id(),
964                    connection_id: connection_item.id(),
965                    publication: publication.expect("validated exists during purification"),
966                    publication_details,
967                });
968
969            connection
970        }
971        CreateSourceConnection::SqlServer {
972            connection,
973            options: _,
974        } => {
975            let connection_item = scx.get_item_by_resolved_name(connection)?;
976            match connection_item.connection()? {
977                Connection::SqlServer(connection) => connection,
978                _ => sql_bail!(
979                    "{} is not a SQL Server connection",
980                    scx.catalog.resolve_full_name(connection_item.name())
981                ),
982            };
983            // TODO(sql_server2): Handle SQL Server connection options.
984
985            let extras = SqlServerSourceExtras {};
986            let connection =
987                GenericSourceConnection::<ReferencedConnection>::from(SqlServerSource {
988                    catalog_id: connection_item.id(),
989                    connection: connection_item.id(),
990                    extras,
991                });
992
993            connection
994        }
995        CreateSourceConnection::MySql {
996            connection,
997            options,
998        } => {
999            let connection_item = scx.get_item_by_resolved_name(connection)?;
1000            match connection_item.connection()? {
1001                Connection::MySql(connection) => connection,
1002                _ => sql_bail!(
1003                    "{} is not a MySQL connection",
1004                    scx.catalog.resolve_full_name(connection_item.name())
1005                ),
1006            };
1007            let MySqlConfigOptionExtracted {
1008                details,
1009                // text/exclude columns are already part of the source-exports and are only included
1010                // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1011                // be removed once we drop support for implicitly created subsources.
1012                text_columns: _,
1013                exclude_columns: _,
1014                seen: _,
1015            } = options.clone().try_into()?;
1016
1017            let details = details
1018                .as_ref()
1019                .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1020            let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1021            let details =
1022                ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1023            let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1024
1025            let connection =
1026                GenericSourceConnection::<ReferencedConnection>::from(MySqlSourceConnection {
1027                    connection: connection_item.id(),
1028                    connection_id: connection_item.id(),
1029                    details,
1030                });
1031
1032            connection
1033        }
1034        CreateSourceConnection::LoadGenerator { generator, options } => {
1035            let load_generator =
1036                load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1037
1038            let LoadGeneratorOptionExtracted {
1039                tick_interval,
1040                as_of,
1041                up_to,
1042                ..
1043            } = options.clone().try_into()?;
1044            let tick_micros = match tick_interval {
1045                Some(interval) => Some(interval.as_micros().try_into()?),
1046                None => None,
1047            };
1048
1049            if up_to < as_of {
1050                sql_bail!("UP TO cannot be less than AS OF");
1051            }
1052
1053            let connection = GenericSourceConnection::from(LoadGeneratorSourceConnection {
1054                load_generator,
1055                tick_micros,
1056                as_of,
1057                up_to,
1058            });
1059
1060            connection
1061        }
1062    };
1063
1064    let CreateSourceOptionExtracted {
1065        timestamp_interval,
1066        retain_history,
1067        seen: _,
1068    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
1069
1070    let metadata_columns_desc = match external_connection {
1071        GenericSourceConnection::Kafka(KafkaSourceConnection {
1072            ref metadata_columns,
1073            ..
1074        }) => kafka_metadata_columns_desc(metadata_columns),
1075        _ => vec![],
1076    };
1077
1078    // Generate the relation description for the primary export of the source.
1079    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1080        scx,
1081        &envelope,
1082        format,
1083        Some(external_connection.default_key_desc()),
1084        external_connection.default_value_desc(),
1085        include_metadata,
1086        metadata_columns_desc,
1087        &external_connection,
1088    )?;
1089    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
1090
1091    let names: Vec<_> = desc.iter_names().cloned().collect();
1092    if let Some(dup) = names.iter().duplicates().next() {
1093        sql_bail!("column {} specified more than once", dup.quoted());
1094    }
1095
1096    // Apply user-specified key constraint
1097    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
1098        // Don't remove this without addressing
1099        // https://github.com/MaterializeInc/database-issues/issues/4371.
1100        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
1101
1102        let key_columns = columns
1103            .into_iter()
1104            .map(normalize::column_name)
1105            .collect::<Vec<_>>();
1106
1107        let mut uniq = BTreeSet::new();
1108        for col in key_columns.iter() {
1109            if !uniq.insert(col) {
1110                sql_bail!("Repeated column name in source key constraint: {}", col);
1111            }
1112        }
1113
1114        let key_indices = key_columns
1115            .iter()
1116            .map(|col| {
1117                let name_idx = desc
1118                    .get_by_name(col)
1119                    .map(|(idx, _type)| idx)
1120                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
1121                if desc.get_unambiguous_name(name_idx).is_none() {
1122                    sql_bail!("Ambiguous column in source key constraint: {}", col);
1123                }
1124                Ok(name_idx)
1125            })
1126            .collect::<Result<Vec<_>, _>>()?;
1127
1128        if !desc.typ().keys.is_empty() {
1129            return Err(key_constraint_err(&desc, &key_columns));
1130        } else {
1131            desc = desc.with_key(key_indices);
1132        }
1133    }
1134
1135    let timestamp_interval = match timestamp_interval {
1136        Some(duration) => {
1137            let min = scx.catalog.system_vars().min_timestamp_interval();
1138            let max = scx.catalog.system_vars().max_timestamp_interval();
1139            if duration < min || duration > max {
1140                return Err(PlanError::InvalidTimestampInterval {
1141                    min,
1142                    max,
1143                    requested: duration,
1144                });
1145            }
1146            duration
1147        }
1148        None => scx.catalog.config().timestamp_interval,
1149    };
1150
1151    let (primary_export, primary_export_details) = if force_source_table_syntax {
1152        (
1153            SourceExportDataConfig {
1154                encoding: None,
1155                envelope: SourceEnvelope::None(NoneEnvelope {
1156                    key_envelope: KeyEnvelope::None,
1157                    key_arity: 0,
1158                }),
1159            },
1160            SourceExportDetails::None,
1161        )
1162    } else {
1163        (
1164            SourceExportDataConfig {
1165                encoding,
1166                envelope: envelope.clone(),
1167            },
1168            external_connection.primary_export_details(),
1169        )
1170    };
1171    let source_desc = SourceDesc::<ReferencedConnection> {
1172        connection: external_connection,
1173        // We only define primary-export details for this source if we are still supporting
1174        // the legacy source syntax. Otherwise, we will not output to the primary collection.
1175        // TODO(database-issues#8620): Remove this field once the new syntax is enabled everywhere
1176        primary_export,
1177        primary_export_details,
1178        timestamp_interval,
1179    };
1180
1181    let progress_subsource = match progress_subsource {
1182        Some(name) => match name {
1183            DeferredItemName::Named(name) => match name {
1184                ResolvedItemName::Item { id, .. } => *id,
1185                ResolvedItemName::Cte { .. }
1186                | ResolvedItemName::ContinualTask { .. }
1187                | ResolvedItemName::Error => {
1188                    sql_bail!("[internal error] invalid target id")
1189                }
1190            },
1191            DeferredItemName::Deferred(_) => {
1192                sql_bail!("[internal error] progress subsource must be named during purification")
1193            }
1194        },
1195        _ => sql_bail!("[internal error] progress subsource must be named during purification"),
1196    };
1197
1198    let if_not_exists = *if_not_exists;
1199    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1200
1201    // Check for an object in the catalog with this same name
1202    let full_name = scx.catalog.resolve_full_name(&name);
1203    let partial_name = PartialItemName::from(full_name.clone());
1204    // For PostgreSQL compatibility, we need to prevent creating sources when
1205    // there is an existing object *or* type of the same name.
1206    if let (false, Ok(item)) = (
1207        if_not_exists,
1208        scx.catalog.resolve_item_or_type(&partial_name),
1209    ) {
1210        return Err(PlanError::ItemAlreadyExists {
1211            name: full_name.to_string(),
1212            item_type: item.item_type(),
1213        });
1214    }
1215
1216    // We will rewrite the cluster if one is not provided, so we must use the
1217    // `in_cluster` value we plan to normalize when we canonicalize the create
1218    // statement.
1219    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
1220
1221    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
1222
1223    // Determine a default timeline for the source.
1224    let timeline = match envelope {
1225        SourceEnvelope::CdcV2 => {
1226            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1227        }
1228        _ => Timeline::EpochMilliseconds,
1229    };
1230
1231    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1232    let source = Source {
1233        create_sql,
1234        data_source: DataSourceDesc::Ingestion(Ingestion {
1235            desc: source_desc,
1236            progress_subsource,
1237        }),
1238        desc,
1239        compaction_window,
1240    };
1241
1242    Ok(Plan::CreateSource(CreateSourcePlan {
1243        name,
1244        source,
1245        if_not_exists,
1246        timeline,
1247        in_cluster: Some(in_cluster.id()),
1248    }))
1249}
1250
1251fn apply_source_envelope_encoding(
1252    scx: &StatementContext,
1253    envelope: &ast::SourceEnvelope,
1254    format: &Option<FormatSpecifier<Aug>>,
1255    key_desc: Option<RelationDesc>,
1256    value_desc: RelationDesc,
1257    include_metadata: &[SourceIncludeMetadata],
1258    metadata_columns_desc: Vec<(&str, ColumnType)>,
1259    source_connection: &GenericSourceConnection<ReferencedConnection>,
1260) -> Result<
1261    (
1262        RelationDesc,
1263        SourceEnvelope,
1264        Option<SourceDataEncoding<ReferencedConnection>>,
1265    ),
1266    PlanError,
1267> {
1268    let encoding = match format {
1269        Some(format) => Some(get_encoding(scx, format, envelope)?),
1270        None => None,
1271    };
1272
1273    let (key_desc, value_desc) = match &encoding {
1274        Some(encoding) => {
1275            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1276            // single column of type bytes.
1277            match value_desc.typ().columns() {
1278                [typ] => match typ.scalar_type {
1279                    ScalarType::Bytes => {}
1280                    _ => sql_bail!(
1281                        "The schema produced by the source is incompatible with format decoding"
1282                    ),
1283                },
1284                _ => sql_bail!(
1285                    "The schema produced by the source is incompatible with format decoding"
1286                ),
1287            }
1288
1289            let (key_desc, value_desc) = encoding.desc()?;
1290
1291            // TODO(petrosagg): This piece of code seems to be making a statement about the
1292            // nullability of the NONE envelope when the source is Kafka. As written, the code
1293            // misses opportunities to mark columns as not nullable and is over conservative. For
1294            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1295            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1296            // should be replaced with precise type-level reasoning.
1297            let key_desc = key_desc.map(|desc| {
1298                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1299                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1300                if is_kafka && is_envelope_none {
1301                    RelationDesc::from_names_and_types(
1302                        desc.into_iter()
1303                            .map(|(name, typ)| (name, typ.nullable(true))),
1304                    )
1305                } else {
1306                    desc
1307                }
1308            });
1309            (key_desc, value_desc)
1310        }
1311        None => (key_desc, value_desc),
1312    };
1313
1314    // KEY VALUE load generators are the only UPSERT source that
1315    // has no encoding but defaults to `INCLUDE KEY`.
1316    //
1317    // As discussed
1318    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1319    // removing this special case amounts to deciding how to handle null keys
1320    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1321    // this special case in.
1322    //
1323    // Note that this is safe because this generator is
1324    // 1. The only source with no encoding that can have its key included.
1325    // 2. Never produces null keys (or values, for that matter).
1326    let key_envelope_no_encoding = matches!(
1327        source_connection,
1328        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1329            load_generator: LoadGenerator::KeyValue(_),
1330            ..
1331        })
1332    );
1333    let mut key_envelope = get_key_envelope(
1334        include_metadata,
1335        encoding.as_ref(),
1336        key_envelope_no_encoding,
1337    )?;
1338
1339    match (&envelope, &key_envelope) {
1340        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1341        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1342            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1343        ),
1344        _ => {}
1345    };
1346
1347    // Not all source envelopes are compatible with all source connections.
1348    // Whoever constructs the source ingestion pipeline is responsible for
1349    // choosing compatible envelopes and connections.
1350    //
1351    // TODO(guswynn): ambiguously assert which connections and envelopes are
1352    // compatible in typechecking
1353    //
1354    // TODO: remove bails as more support for upsert is added.
1355    let envelope = match &envelope {
1356        // TODO: fixup key envelope
1357        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1358        ast::SourceEnvelope::Debezium => {
1359            //TODO check that key envelope is not set
1360            let after_idx = match typecheck_debezium(&value_desc) {
1361                Ok((_before_idx, after_idx)) => Ok(after_idx),
1362                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1363                    Some(DataEncoding::Avro(_)) => Err(type_err),
1364                    _ => Err(sql_err!(
1365                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1366                    )),
1367                },
1368            }?;
1369
1370            UnplannedSourceEnvelope::Upsert {
1371                style: UpsertStyle::Debezium { after_idx },
1372            }
1373        }
1374        ast::SourceEnvelope::Upsert {
1375            value_decode_err_policy,
1376        } => {
1377            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1378                None => {
1379                    if !key_envelope_no_encoding {
1380                        bail_unsupported!(format!(
1381                            "UPSERT requires a key/value format: {:?}",
1382                            format
1383                        ))
1384                    }
1385                    None
1386                }
1387                Some(key_encoding) => Some(key_encoding),
1388            };
1389            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1390            // specified.
1391            if key_envelope == KeyEnvelope::None {
1392                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1393            }
1394            // If the value decode error policy is not set we use the default upsert style.
1395            let style = match value_decode_err_policy.as_slice() {
1396                [] => UpsertStyle::Default(key_envelope),
1397                [SourceErrorPolicy::Inline { alias }] => {
1398                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1399                    UpsertStyle::ValueErrInline {
1400                        key_envelope,
1401                        error_column: alias
1402                            .as_ref()
1403                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1404                    }
1405                }
1406                _ => {
1407                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1408                }
1409            };
1410
1411            UnplannedSourceEnvelope::Upsert { style }
1412        }
1413        ast::SourceEnvelope::CdcV2 => {
1414            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1415            //TODO check that key envelope is not set
1416            match format {
1417                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1418                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1419            }
1420            UnplannedSourceEnvelope::CdcV2
1421        }
1422    };
1423
1424    let metadata_desc = included_column_desc(metadata_columns_desc);
1425    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1426
1427    Ok((desc, envelope, encoding))
1428}
1429
1430/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1431/// of columns and constraints.
1432fn plan_source_export_desc(
1433    scx: &StatementContext,
1434    name: &UnresolvedItemName,
1435    columns: &Vec<ColumnDef<Aug>>,
1436    constraints: &Vec<TableConstraint<Aug>>,
1437) -> Result<RelationDesc, PlanError> {
1438    let names: Vec<_> = columns
1439        .iter()
1440        .map(|c| normalize::column_name(c.name.clone()))
1441        .collect();
1442
1443    if let Some(dup) = names.iter().duplicates().next() {
1444        sql_bail!("column {} specified more than once", dup.quoted());
1445    }
1446
1447    // Build initial relation type that handles declared data types
1448    // and NOT NULL constraints.
1449    let mut column_types = Vec::with_capacity(columns.len());
1450    let mut keys = Vec::new();
1451
1452    for (i, c) in columns.into_iter().enumerate() {
1453        let aug_data_type = &c.data_type;
1454        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1455        let mut nullable = true;
1456        for option in &c.options {
1457            match &option.option {
1458                ColumnOption::NotNull => nullable = false,
1459                ColumnOption::Default(_) => {
1460                    bail_unsupported!("Source export with default value")
1461                }
1462                ColumnOption::Unique { is_primary } => {
1463                    keys.push(vec![i]);
1464                    if *is_primary {
1465                        nullable = false;
1466                    }
1467                }
1468                other => {
1469                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1470                }
1471            }
1472        }
1473        column_types.push(ty.nullable(nullable));
1474    }
1475
1476    let mut seen_primary = false;
1477    'c: for constraint in constraints {
1478        match constraint {
1479            TableConstraint::Unique {
1480                name: _,
1481                columns,
1482                is_primary,
1483                nulls_not_distinct,
1484            } => {
1485                if seen_primary && *is_primary {
1486                    sql_bail!(
1487                        "multiple primary keys for source export {} are not allowed",
1488                        name.to_ast_string_stable()
1489                    );
1490                }
1491                seen_primary = *is_primary || seen_primary;
1492
1493                let mut key = vec![];
1494                for column in columns {
1495                    let column = normalize::column_name(column.clone());
1496                    match names.iter().position(|name| *name == column) {
1497                        None => sql_bail!("unknown column in constraint: {}", column),
1498                        Some(i) => {
1499                            let nullable = &mut column_types[i].nullable;
1500                            if *is_primary {
1501                                if *nulls_not_distinct {
1502                                    sql_bail!(
1503                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1504                                    );
1505                                }
1506                                *nullable = false;
1507                            } else if !(*nulls_not_distinct || !*nullable) {
1508                                // Non-primary key unique constraints are only keys if all of their
1509                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1510                                break 'c;
1511                            }
1512
1513                            key.push(i);
1514                        }
1515                    }
1516                }
1517
1518                if *is_primary {
1519                    keys.insert(0, key);
1520                } else {
1521                    keys.push(key);
1522                }
1523            }
1524            TableConstraint::ForeignKey { .. } => {
1525                bail_unsupported!("Source export with a foreign key")
1526            }
1527            TableConstraint::Check { .. } => {
1528                bail_unsupported!("Source export with a check constraint")
1529            }
1530        }
1531    }
1532
1533    let typ = RelationType::new(column_types).with_keys(keys);
1534    let desc = RelationDesc::new(typ, names);
1535    Ok(desc)
1536}
1537
1538generate_extracted_config!(
1539    CreateSubsourceOption,
1540    (Progress, bool, Default(false)),
1541    (ExternalReference, UnresolvedItemName),
1542    (TextColumns, Vec::<Ident>, Default(vec![])),
1543    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1544    (Details, String)
1545);
1546
1547pub fn plan_create_subsource(
1548    scx: &StatementContext,
1549    stmt: CreateSubsourceStatement<Aug>,
1550) -> Result<Plan, PlanError> {
1551    let CreateSubsourceStatement {
1552        name,
1553        columns,
1554        of_source,
1555        constraints,
1556        if_not_exists,
1557        with_options,
1558    } = &stmt;
1559
1560    let CreateSubsourceOptionExtracted {
1561        progress,
1562        external_reference,
1563        text_columns,
1564        exclude_columns,
1565        details,
1566        seen: _,
1567    } = with_options.clone().try_into()?;
1568
1569    // This invariant is enforced during purification; we are responsible for
1570    // creating the AST for subsources as a response to CREATE SOURCE
1571    // statements, so this would fire in integration testing if we failed to
1572    // uphold it.
1573    assert!(
1574        progress ^ (external_reference.is_some() && of_source.is_some()),
1575        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1576    );
1577
1578    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1579
1580    let data_source = if let Some(source_reference) = of_source {
1581        // If the new source table syntax is forced we should not be creating any non-progress
1582        // subsources.
1583        if scx.catalog.system_vars().enable_create_table_from_source()
1584            && scx.catalog.system_vars().force_source_table_syntax()
1585        {
1586            Err(PlanError::UseTablesForSources(
1587                "CREATE SUBSOURCE".to_string(),
1588            ))?;
1589        }
1590
1591        // This is a subsource with the "natural" dependency order, i.e. it is
1592        // not a legacy subsource with the inverted structure.
1593        let ingestion_id = *source_reference.item_id();
1594        let external_reference = external_reference.unwrap();
1595
1596        // Decode the details option stored on the subsource statement, which contains information
1597        // created during the purification process.
1598        let details = details
1599            .as_ref()
1600            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1601        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1602        let details =
1603            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1604        let details =
1605            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1606        let details = match details {
1607            SourceExportStatementDetails::Postgres { table } => {
1608                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1609                    column_casts: crate::pure::postgres::generate_column_casts(
1610                        scx,
1611                        &table,
1612                        &text_columns,
1613                    )?,
1614                    table,
1615                })
1616            }
1617            SourceExportStatementDetails::MySql {
1618                table,
1619                initial_gtid_set,
1620            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1621                table,
1622                initial_gtid_set,
1623                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1624                exclude_columns: exclude_columns
1625                    .into_iter()
1626                    .map(|c| c.into_string())
1627                    .collect(),
1628            }),
1629            SourceExportStatementDetails::SqlServer {
1630                table,
1631                capture_instance,
1632            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1633                capture_instance,
1634                table,
1635                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1636                exclude_columns: exclude_columns
1637                    .into_iter()
1638                    .map(|c| c.into_string())
1639                    .collect(),
1640            }),
1641            SourceExportStatementDetails::LoadGenerator { output } => {
1642                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1643            }
1644            SourceExportStatementDetails::Kafka {} => {
1645                bail_unsupported!("subsources cannot reference Kafka sources")
1646            }
1647        };
1648        DataSourceDesc::IngestionExport {
1649            ingestion_id,
1650            external_reference,
1651            details,
1652            // Subsources don't currently support non-default envelopes / encoding
1653            data_config: SourceExportDataConfig {
1654                envelope: SourceEnvelope::None(NoneEnvelope {
1655                    key_envelope: KeyEnvelope::None,
1656                    key_arity: 0,
1657                }),
1658                encoding: None,
1659            },
1660        }
1661    } else if progress {
1662        DataSourceDesc::Progress
1663    } else {
1664        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1665    };
1666
1667    let if_not_exists = *if_not_exists;
1668    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1669
1670    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1671
1672    let source = Source {
1673        create_sql,
1674        data_source,
1675        desc,
1676        compaction_window: None,
1677    };
1678
1679    Ok(Plan::CreateSource(CreateSourcePlan {
1680        name,
1681        source,
1682        if_not_exists,
1683        timeline: Timeline::EpochMilliseconds,
1684        in_cluster: None,
1685    }))
1686}
1687
1688generate_extracted_config!(
1689    TableFromSourceOption,
1690    (TextColumns, Vec::<Ident>, Default(vec![])),
1691    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1692    (PartitionBy, Vec<Ident>),
1693    (Details, String)
1694);
1695
1696pub fn plan_create_table_from_source(
1697    scx: &StatementContext,
1698    stmt: CreateTableFromSourceStatement<Aug>,
1699) -> Result<Plan, PlanError> {
1700    if !scx.catalog.system_vars().enable_create_table_from_source() {
1701        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1702    }
1703
1704    let CreateTableFromSourceStatement {
1705        name,
1706        columns,
1707        constraints,
1708        if_not_exists,
1709        source,
1710        external_reference,
1711        envelope,
1712        format,
1713        include_metadata,
1714        with_options,
1715    } = &stmt;
1716
1717    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1718
1719    let TableFromSourceOptionExtracted {
1720        text_columns,
1721        exclude_columns,
1722        partition_by,
1723        details,
1724        seen: _,
1725    } = with_options.clone().try_into()?;
1726
1727    let source_item = scx.get_item_by_resolved_name(source)?;
1728    let ingestion_id = source_item.id();
1729
1730    // Decode the details option stored on the statement, which contains information
1731    // created during the purification process.
1732    let details = details
1733        .as_ref()
1734        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1735    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1736    let details =
1737        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1738    let details =
1739        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1740
1741    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1742        && include_metadata
1743            .iter()
1744            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1745    {
1746        // TODO(guswynn): should this be `bail_unsupported!`?
1747        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1748    }
1749    if !matches!(
1750        details,
1751        SourceExportStatementDetails::Kafka { .. }
1752            | SourceExportStatementDetails::LoadGenerator { .. }
1753    ) && !include_metadata.is_empty()
1754    {
1755        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1756    }
1757
1758    let details = match details {
1759        SourceExportStatementDetails::Postgres { table } => {
1760            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1761                column_casts: crate::pure::postgres::generate_column_casts(
1762                    scx,
1763                    &table,
1764                    &text_columns,
1765                )?,
1766                table,
1767            })
1768        }
1769        SourceExportStatementDetails::MySql {
1770            table,
1771            initial_gtid_set,
1772        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1773            table,
1774            initial_gtid_set,
1775            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1776            exclude_columns: exclude_columns
1777                .into_iter()
1778                .map(|c| c.into_string())
1779                .collect(),
1780        }),
1781        SourceExportStatementDetails::SqlServer {
1782            table,
1783            capture_instance,
1784        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1785            table,
1786            capture_instance,
1787            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1788            exclude_columns: exclude_columns
1789                .into_iter()
1790                .map(|c| c.into_string())
1791                .collect(),
1792        }),
1793        SourceExportStatementDetails::LoadGenerator { output } => {
1794            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1795        }
1796        SourceExportStatementDetails::Kafka {} => {
1797            if !include_metadata.is_empty()
1798                && !matches!(
1799                    envelope,
1800                    ast::SourceEnvelope::Upsert { .. }
1801                        | ast::SourceEnvelope::None
1802                        | ast::SourceEnvelope::Debezium
1803                )
1804            {
1805                // TODO(guswynn): should this be `bail_unsupported!`?
1806                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1807            }
1808
1809            let metadata_columns = include_metadata
1810                .into_iter()
1811                .flat_map(|item| match item {
1812                    SourceIncludeMetadata::Timestamp { alias } => {
1813                        let name = match alias {
1814                            Some(name) => name.to_string(),
1815                            None => "timestamp".to_owned(),
1816                        };
1817                        Some((name, KafkaMetadataKind::Timestamp))
1818                    }
1819                    SourceIncludeMetadata::Partition { alias } => {
1820                        let name = match alias {
1821                            Some(name) => name.to_string(),
1822                            None => "partition".to_owned(),
1823                        };
1824                        Some((name, KafkaMetadataKind::Partition))
1825                    }
1826                    SourceIncludeMetadata::Offset { alias } => {
1827                        let name = match alias {
1828                            Some(name) => name.to_string(),
1829                            None => "offset".to_owned(),
1830                        };
1831                        Some((name, KafkaMetadataKind::Offset))
1832                    }
1833                    SourceIncludeMetadata::Headers { alias } => {
1834                        let name = match alias {
1835                            Some(name) => name.to_string(),
1836                            None => "headers".to_owned(),
1837                        };
1838                        Some((name, KafkaMetadataKind::Headers))
1839                    }
1840                    SourceIncludeMetadata::Header {
1841                        alias,
1842                        key,
1843                        use_bytes,
1844                    } => Some((
1845                        alias.to_string(),
1846                        KafkaMetadataKind::Header {
1847                            key: key.clone(),
1848                            use_bytes: *use_bytes,
1849                        },
1850                    )),
1851                    SourceIncludeMetadata::Key { .. } => {
1852                        // handled below
1853                        None
1854                    }
1855                })
1856                .collect();
1857
1858            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1859        }
1860    };
1861
1862    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1863
1864    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1865    // during purification and define the `columns` and `constraints` fields for the statement,
1866    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1867    // we use the source connection's default schema.
1868    let (key_desc, value_desc) =
1869        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1870            let columns = match columns {
1871                TableFromSourceColumns::Defined(columns) => columns,
1872                _ => unreachable!(),
1873            };
1874            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1875            (None, desc)
1876        } else {
1877            let key_desc = source_connection.default_key_desc();
1878            let value_desc = source_connection.default_value_desc();
1879            (Some(key_desc), value_desc)
1880        };
1881
1882    let metadata_columns_desc = match &details {
1883        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1884            metadata_columns, ..
1885        }) => kafka_metadata_columns_desc(metadata_columns),
1886        _ => vec![],
1887    };
1888
1889    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1890        scx,
1891        &envelope,
1892        format,
1893        key_desc,
1894        value_desc,
1895        include_metadata,
1896        metadata_columns_desc,
1897        source_connection,
1898    )?;
1899    if let TableFromSourceColumns::Named(col_names) = columns {
1900        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1901    }
1902
1903    let names: Vec<_> = desc.iter_names().cloned().collect();
1904    if let Some(dup) = names.iter().duplicates().next() {
1905        sql_bail!("column {} specified more than once", dup.quoted());
1906    }
1907
1908    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1909
1910    // Allow users to specify a timeline. If they do not, determine a default
1911    // timeline for the source.
1912    let timeline = match envelope {
1913        SourceEnvelope::CdcV2 => {
1914            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1915        }
1916        _ => Timeline::EpochMilliseconds,
1917    };
1918
1919    if let Some(partition_by) = partition_by {
1920        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1921        check_partition_by(&desc, partition_by)?;
1922    }
1923
1924    let data_source = DataSourceDesc::IngestionExport {
1925        ingestion_id,
1926        external_reference: external_reference
1927            .as_ref()
1928            .expect("populated in purification")
1929            .clone(),
1930        details,
1931        data_config: SourceExportDataConfig { envelope, encoding },
1932    };
1933
1934    let if_not_exists = *if_not_exists;
1935
1936    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1937
1938    let table = Table {
1939        create_sql,
1940        desc: VersionedRelationDesc::new(desc),
1941        temporary: false,
1942        compaction_window: None,
1943        data_source: TableDataSource::DataSource {
1944            desc: data_source,
1945            timeline,
1946        },
1947    };
1948
1949    Ok(Plan::CreateTable(CreateTablePlan {
1950        name,
1951        table,
1952        if_not_exists,
1953    }))
1954}
1955
1956generate_extracted_config!(
1957    LoadGeneratorOption,
1958    (TickInterval, Duration),
1959    (AsOf, u64, Default(0_u64)),
1960    (UpTo, u64, Default(u64::MAX)),
1961    (ScaleFactor, f64),
1962    (MaxCardinality, u64),
1963    (Keys, u64),
1964    (SnapshotRounds, u64),
1965    (TransactionalSnapshot, bool),
1966    (ValueSize, u64),
1967    (Seed, u64),
1968    (Partitions, u64),
1969    (BatchSize, u64)
1970);
1971
1972impl LoadGeneratorOptionExtracted {
1973    pub(super) fn ensure_only_valid_options(
1974        &self,
1975        loadgen: &ast::LoadGenerator,
1976    ) -> Result<(), PlanError> {
1977        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
1978
1979        let mut options = self.seen.clone();
1980
1981        let permitted_options: &[_] = match loadgen {
1982            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
1983            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
1984            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
1985            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
1986            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
1987            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
1988            ast::LoadGenerator::KeyValue => &[
1989                TickInterval,
1990                Keys,
1991                SnapshotRounds,
1992                TransactionalSnapshot,
1993                ValueSize,
1994                Seed,
1995                Partitions,
1996                BatchSize,
1997            ],
1998        };
1999
2000        for o in permitted_options {
2001            options.remove(o);
2002        }
2003
2004        if !options.is_empty() {
2005            sql_bail!(
2006                "{} load generators do not support {} values",
2007                loadgen,
2008                options.iter().join(", ")
2009            )
2010        }
2011
2012        Ok(())
2013    }
2014}
2015
2016pub(crate) fn load_generator_ast_to_generator(
2017    scx: &StatementContext,
2018    loadgen: &ast::LoadGenerator,
2019    options: &[LoadGeneratorOption<Aug>],
2020    include_metadata: &[SourceIncludeMetadata],
2021) -> Result<LoadGenerator, PlanError> {
2022    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2023    extracted.ensure_only_valid_options(loadgen)?;
2024
2025    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2026        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2027    }
2028
2029    let load_generator = match loadgen {
2030        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2031        ast::LoadGenerator::Clock => {
2032            scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2033            LoadGenerator::Clock
2034        }
2035        ast::LoadGenerator::Counter => {
2036            let LoadGeneratorOptionExtracted {
2037                max_cardinality, ..
2038            } = extracted;
2039            LoadGenerator::Counter { max_cardinality }
2040        }
2041        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2042        ast::LoadGenerator::Datums => LoadGenerator::Datums,
2043        ast::LoadGenerator::Tpch => {
2044            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2045
2046            // Default to 0.01 scale factor (=10MB).
2047            let sf: f64 = scale_factor.unwrap_or(0.01);
2048            if !sf.is_finite() || sf < 0.0 {
2049                sql_bail!("unsupported scale factor {sf}");
2050            }
2051
2052            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2053                let total = (sf * multiplier).floor();
2054                let mut i = i64::try_cast_from(total)
2055                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2056                if i < 1 {
2057                    i = 1;
2058                }
2059                Ok(i)
2060            };
2061
2062            // The multiplications here are safely unchecked because they will
2063            // overflow to infinity, which will be caught by f64_to_i64.
2064            let count_supplier = f_to_i(10_000f64)?;
2065            let count_part = f_to_i(200_000f64)?;
2066            let count_customer = f_to_i(150_000f64)?;
2067            let count_orders = f_to_i(150_000f64 * 10f64)?;
2068            let count_clerk = f_to_i(1_000f64)?;
2069
2070            LoadGenerator::Tpch {
2071                count_supplier,
2072                count_part,
2073                count_customer,
2074                count_orders,
2075                count_clerk,
2076            }
2077        }
2078        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2079            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2080            let LoadGeneratorOptionExtracted {
2081                keys,
2082                snapshot_rounds,
2083                transactional_snapshot,
2084                value_size,
2085                tick_interval,
2086                seed,
2087                partitions,
2088                batch_size,
2089                ..
2090            } = extracted;
2091
2092            let mut include_offset = None;
2093            for im in include_metadata {
2094                match im {
2095                    SourceIncludeMetadata::Offset { alias } => {
2096                        include_offset = match alias {
2097                            Some(alias) => Some(alias.to_string()),
2098                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2099                        }
2100                    }
2101                    SourceIncludeMetadata::Key { .. } => continue,
2102
2103                    _ => {
2104                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2105                    }
2106                };
2107            }
2108
2109            let lgkv = KeyValueLoadGenerator {
2110                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2111                snapshot_rounds: snapshot_rounds
2112                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2113                // Defaults to true.
2114                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2115                value_size: value_size
2116                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2117                partitions: partitions
2118                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2119                tick_interval,
2120                batch_size: batch_size
2121                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2122                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2123                include_offset,
2124            };
2125
2126            if lgkv.keys == 0
2127                || lgkv.partitions == 0
2128                || lgkv.value_size == 0
2129                || lgkv.batch_size == 0
2130            {
2131                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2132            }
2133
2134            if lgkv.keys % lgkv.partitions != 0 {
2135                sql_bail!("KEYS must be a multiple of PARTITIONS")
2136            }
2137
2138            if lgkv.batch_size > lgkv.keys {
2139                sql_bail!("KEYS must be larger than BATCH SIZE")
2140            }
2141
2142            // This constraints simplifies the source implementation.
2143            // We can lift it later.
2144            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2145                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2146            }
2147
2148            if lgkv.snapshot_rounds == 0 {
2149                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2150            }
2151
2152            LoadGenerator::KeyValue(lgkv)
2153        }
2154    };
2155
2156    Ok(load_generator)
2157}
2158
2159fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2160    let before = value_desc.get_by_name(&"before".into());
2161    let (after_idx, after_ty) = value_desc
2162        .get_by_name(&"after".into())
2163        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2164    let before_idx = if let Some((before_idx, before_ty)) = before {
2165        if !matches!(before_ty.scalar_type, ScalarType::Record { .. }) {
2166            sql_bail!("'before' column must be of type record");
2167        }
2168        if before_ty != after_ty {
2169            sql_bail!("'before' type differs from 'after' column");
2170        }
2171        Some(before_idx)
2172    } else {
2173        None
2174    };
2175    Ok((before_idx, after_idx))
2176}
2177
2178fn get_encoding(
2179    scx: &StatementContext,
2180    format: &FormatSpecifier<Aug>,
2181    envelope: &ast::SourceEnvelope,
2182) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2183    let encoding = match format {
2184        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2185        FormatSpecifier::KeyValue { key, value } => {
2186            let key = {
2187                let encoding = get_encoding_inner(scx, key)?;
2188                Some(encoding.key.unwrap_or(encoding.value))
2189            };
2190            let value = get_encoding_inner(scx, value)?.value;
2191            SourceDataEncoding { key, value }
2192        }
2193    };
2194
2195    let requires_keyvalue = matches!(
2196        envelope,
2197        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2198    );
2199    let is_keyvalue = encoding.key.is_some();
2200    if requires_keyvalue && !is_keyvalue {
2201        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2202    };
2203
2204    Ok(encoding)
2205}
2206
2207/// Determine the cluster ID to use for this item.
2208///
2209/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2210/// Because of this, do not normalize/canonicalize the create SQL statement
2211/// until after calling this function.
2212fn source_sink_cluster_config<'a, 'ctx>(
2213    scx: &'a StatementContext<'ctx>,
2214    in_cluster: &mut Option<ResolvedClusterName>,
2215) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2216    let cluster = match in_cluster {
2217        None => {
2218            let cluster = scx.catalog.resolve_cluster(None)?;
2219            *in_cluster = Some(ResolvedClusterName {
2220                id: cluster.id(),
2221                print_name: None,
2222            });
2223            cluster
2224        }
2225        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2226    };
2227
2228    Ok(cluster)
2229}
2230
2231generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2232
2233#[derive(Debug)]
2234pub struct Schema {
2235    pub key_schema: Option<String>,
2236    pub value_schema: String,
2237    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2238    pub confluent_wire_format: bool,
2239}
2240
2241fn get_encoding_inner(
2242    scx: &StatementContext,
2243    format: &Format<Aug>,
2244) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2245    let value = match format {
2246        Format::Bytes => DataEncoding::Bytes,
2247        Format::Avro(schema) => {
2248            let Schema {
2249                key_schema,
2250                value_schema,
2251                csr_connection,
2252                confluent_wire_format,
2253            } = match schema {
2254                // TODO(jldlaughlin): we need a way to pass in primary key information
2255                // when building a source from a string or file.
2256                AvroSchema::InlineSchema {
2257                    schema: ast::Schema { schema },
2258                    with_options,
2259                } => {
2260                    let AvroSchemaOptionExtracted {
2261                        confluent_wire_format,
2262                        ..
2263                    } = with_options.clone().try_into()?;
2264
2265                    Schema {
2266                        key_schema: None,
2267                        value_schema: schema.clone(),
2268                        csr_connection: None,
2269                        confluent_wire_format,
2270                    }
2271                }
2272                AvroSchema::Csr {
2273                    csr_connection:
2274                        CsrConnectionAvro {
2275                            connection,
2276                            seed,
2277                            key_strategy: _,
2278                            value_strategy: _,
2279                        },
2280                } => {
2281                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2282                    let csr_connection = match item.connection()? {
2283                        Connection::Csr(_) => item.id(),
2284                        _ => {
2285                            sql_bail!(
2286                                "{} is not a schema registry connection",
2287                                scx.catalog
2288                                    .resolve_full_name(item.name())
2289                                    .to_string()
2290                                    .quoted()
2291                            )
2292                        }
2293                    };
2294
2295                    if let Some(seed) = seed {
2296                        Schema {
2297                            key_schema: seed.key_schema.clone(),
2298                            value_schema: seed.value_schema.clone(),
2299                            csr_connection: Some(csr_connection),
2300                            confluent_wire_format: true,
2301                        }
2302                    } else {
2303                        unreachable!("CSR seed resolution should already have been called: Avro")
2304                    }
2305                }
2306            };
2307
2308            if let Some(key_schema) = key_schema {
2309                return Ok(SourceDataEncoding {
2310                    key: Some(DataEncoding::Avro(AvroEncoding {
2311                        schema: key_schema,
2312                        csr_connection: csr_connection.clone(),
2313                        confluent_wire_format,
2314                    })),
2315                    value: DataEncoding::Avro(AvroEncoding {
2316                        schema: value_schema,
2317                        csr_connection,
2318                        confluent_wire_format,
2319                    }),
2320                });
2321            } else {
2322                DataEncoding::Avro(AvroEncoding {
2323                    schema: value_schema,
2324                    csr_connection,
2325                    confluent_wire_format,
2326                })
2327            }
2328        }
2329        Format::Protobuf(schema) => match schema {
2330            ProtobufSchema::Csr {
2331                csr_connection:
2332                    CsrConnectionProtobuf {
2333                        connection:
2334                            CsrConnection {
2335                                connection,
2336                                options,
2337                            },
2338                        seed,
2339                    },
2340            } => {
2341                if let Some(CsrSeedProtobuf { key, value }) = seed {
2342                    let item = scx.get_item_by_resolved_name(connection)?;
2343                    let _ = match item.connection()? {
2344                        Connection::Csr(connection) => connection,
2345                        _ => {
2346                            sql_bail!(
2347                                "{} is not a schema registry connection",
2348                                scx.catalog
2349                                    .resolve_full_name(item.name())
2350                                    .to_string()
2351                                    .quoted()
2352                            )
2353                        }
2354                    };
2355
2356                    if !options.is_empty() {
2357                        sql_bail!("Protobuf CSR connections do not support any options");
2358                    }
2359
2360                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2361                        descriptors: strconv::parse_bytes(&value.schema)?,
2362                        message_name: value.message_name.clone(),
2363                        confluent_wire_format: true,
2364                    });
2365                    if let Some(key) = key {
2366                        return Ok(SourceDataEncoding {
2367                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2368                                descriptors: strconv::parse_bytes(&key.schema)?,
2369                                message_name: key.message_name.clone(),
2370                                confluent_wire_format: true,
2371                            })),
2372                            value,
2373                        });
2374                    }
2375                    value
2376                } else {
2377                    unreachable!("CSR seed resolution should already have been called: Proto")
2378                }
2379            }
2380            ProtobufSchema::InlineSchema {
2381                message_name,
2382                schema: ast::Schema { schema },
2383            } => {
2384                let descriptors = strconv::parse_bytes(schema)?;
2385
2386                DataEncoding::Protobuf(ProtobufEncoding {
2387                    descriptors,
2388                    message_name: message_name.to_owned(),
2389                    confluent_wire_format: false,
2390                })
2391            }
2392        },
2393        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2394            regex: mz_repr::adt::regex::Regex::new(regex, false)
2395                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2396        }),
2397        Format::Csv { columns, delimiter } => {
2398            let columns = match columns {
2399                CsvColumns::Header { names } => {
2400                    if names.is_empty() {
2401                        sql_bail!("[internal error] column spec should get names in purify")
2402                    }
2403                    ColumnSpec::Header {
2404                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2405                    }
2406                }
2407                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2408            };
2409            DataEncoding::Csv(CsvEncoding {
2410                columns,
2411                delimiter: u8::try_from(*delimiter)
2412                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2413            })
2414        }
2415        Format::Json { array: false } => DataEncoding::Json,
2416        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2417        Format::Text => DataEncoding::Text,
2418    };
2419    Ok(SourceDataEncoding { key: None, value })
2420}
2421
2422/// Extract the key envelope, if it is requested
2423fn get_key_envelope(
2424    included_items: &[SourceIncludeMetadata],
2425    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2426    key_envelope_no_encoding: bool,
2427) -> Result<KeyEnvelope, PlanError> {
2428    let key_definition = included_items
2429        .iter()
2430        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2431    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2432        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2433            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2434            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2435            (Some(name), _) if key_envelope_no_encoding => {
2436                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2437            }
2438            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2439            (_, None) => {
2440                // `kd.alias` == `None` means `INCLUDE KEY`
2441                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2442                // These both make sense with the same error message
2443                sql_bail!(
2444                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2445                        got bare FORMAT"
2446                );
2447            }
2448        }
2449    } else {
2450        Ok(KeyEnvelope::None)
2451    }
2452}
2453
2454/// Gets the key envelope for a given key encoding when no name for the key has
2455/// been requested by the user.
2456fn get_unnamed_key_envelope(
2457    key: Option<&DataEncoding<ReferencedConnection>>,
2458) -> Result<KeyEnvelope, PlanError> {
2459    // If the key is requested but comes from an unnamed type then it gets the name "key"
2460    //
2461    // Otherwise it gets the names of the columns in the type
2462    let is_composite = match key {
2463        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2464        Some(
2465            DataEncoding::Avro(_)
2466            | DataEncoding::Csv(_)
2467            | DataEncoding::Protobuf(_)
2468            | DataEncoding::Regex { .. },
2469        ) => true,
2470        None => false,
2471    };
2472
2473    if is_composite {
2474        Ok(KeyEnvelope::Flattened)
2475    } else {
2476        Ok(KeyEnvelope::Named("key".to_string()))
2477    }
2478}
2479
2480pub fn describe_create_view(
2481    _: &StatementContext,
2482    _: CreateViewStatement<Aug>,
2483) -> Result<StatementDesc, PlanError> {
2484    Ok(StatementDesc::new(None))
2485}
2486
2487pub fn plan_view(
2488    scx: &StatementContext,
2489    def: &mut ViewDefinition<Aug>,
2490    temporary: bool,
2491) -> Result<(QualifiedItemName, View), PlanError> {
2492    let create_sql = normalize::create_statement(
2493        scx,
2494        Statement::CreateView(CreateViewStatement {
2495            if_exists: IfExistsBehavior::Error,
2496            temporary,
2497            definition: def.clone(),
2498        }),
2499    )?;
2500
2501    let ViewDefinition {
2502        name,
2503        columns,
2504        query,
2505    } = def;
2506
2507    let query::PlannedRootQuery {
2508        expr,
2509        mut desc,
2510        finishing,
2511        scope: _,
2512    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2513    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2514    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2515    // here to help with database-issues#236. However, in the meantime, there might be a better
2516    // approach to solve database-issues#236:
2517    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2518    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2519        &finishing,
2520        expr.arity()
2521    ));
2522    if expr.contains_parameters()? {
2523        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2524    }
2525
2526    let dependencies = expr
2527        .depends_on()
2528        .into_iter()
2529        .map(|gid| scx.catalog.resolve_item_id(&gid))
2530        .collect();
2531
2532    let name = if temporary {
2533        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2534    } else {
2535        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2536    };
2537
2538    plan_utils::maybe_rename_columns(
2539        format!("view {}", scx.catalog.resolve_full_name(&name)),
2540        &mut desc,
2541        columns,
2542    )?;
2543    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2544
2545    if let Some(dup) = names.iter().duplicates().next() {
2546        sql_bail!("column {} specified more than once", dup.quoted());
2547    }
2548
2549    let view = View {
2550        create_sql,
2551        expr,
2552        dependencies,
2553        column_names: names,
2554        temporary,
2555    };
2556
2557    Ok((name, view))
2558}
2559
2560pub fn plan_create_view(
2561    scx: &StatementContext,
2562    mut stmt: CreateViewStatement<Aug>,
2563) -> Result<Plan, PlanError> {
2564    let CreateViewStatement {
2565        temporary,
2566        if_exists,
2567        definition,
2568    } = &mut stmt;
2569    let (name, view) = plan_view(scx, definition, *temporary)?;
2570
2571    // Override the statement-level IfExistsBehavior with Skip if this is
2572    // explicitly requested in the PlanContext (the default is `false`).
2573    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2574
2575    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2576        let if_exists = true;
2577        let cascade = false;
2578        let maybe_item_to_drop = plan_drop_item(
2579            scx,
2580            ObjectType::View,
2581            if_exists,
2582            definition.name.clone(),
2583            cascade,
2584        )?;
2585
2586        // Check if the new View depends on the item that we would be replacing.
2587        if let Some(id) = maybe_item_to_drop {
2588            let dependencies = view.expr.depends_on();
2589            let invalid_drop = scx
2590                .get_item(&id)
2591                .global_ids()
2592                .any(|gid| dependencies.contains(&gid));
2593            if invalid_drop {
2594                let item = scx.catalog.get_item(&id);
2595                sql_bail!(
2596                    "cannot replace view {0}: depended upon by new {0} definition",
2597                    scx.catalog.resolve_full_name(item.name())
2598                );
2599            }
2600
2601            Some(id)
2602        } else {
2603            None
2604        }
2605    } else {
2606        None
2607    };
2608    let drop_ids = replace
2609        .map(|id| {
2610            scx.catalog
2611                .item_dependents(id)
2612                .into_iter()
2613                .map(|id| id.unwrap_item_id())
2614                .collect()
2615        })
2616        .unwrap_or_default();
2617
2618    // Check for an object in the catalog with this same name
2619    let full_name = scx.catalog.resolve_full_name(&name);
2620    let partial_name = PartialItemName::from(full_name.clone());
2621    // For PostgreSQL compatibility, we need to prevent creating views when
2622    // there is an existing object *or* type of the same name.
2623    if let (Ok(item), IfExistsBehavior::Error, false) = (
2624        scx.catalog.resolve_item_or_type(&partial_name),
2625        *if_exists,
2626        ignore_if_exists_errors,
2627    ) {
2628        return Err(PlanError::ItemAlreadyExists {
2629            name: full_name.to_string(),
2630            item_type: item.item_type(),
2631        });
2632    }
2633
2634    Ok(Plan::CreateView(CreateViewPlan {
2635        name,
2636        view,
2637        replace,
2638        drop_ids,
2639        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2640        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2641    }))
2642}
2643
2644pub fn describe_create_materialized_view(
2645    _: &StatementContext,
2646    _: CreateMaterializedViewStatement<Aug>,
2647) -> Result<StatementDesc, PlanError> {
2648    Ok(StatementDesc::new(None))
2649}
2650
2651pub fn describe_create_continual_task(
2652    _: &StatementContext,
2653    _: CreateContinualTaskStatement<Aug>,
2654) -> Result<StatementDesc, PlanError> {
2655    Ok(StatementDesc::new(None))
2656}
2657
2658pub fn describe_create_network_policy(
2659    _: &StatementContext,
2660    _: CreateNetworkPolicyStatement<Aug>,
2661) -> Result<StatementDesc, PlanError> {
2662    Ok(StatementDesc::new(None))
2663}
2664
2665pub fn describe_alter_network_policy(
2666    _: &StatementContext,
2667    _: AlterNetworkPolicyStatement<Aug>,
2668) -> Result<StatementDesc, PlanError> {
2669    Ok(StatementDesc::new(None))
2670}
2671
2672pub fn plan_create_materialized_view(
2673    scx: &StatementContext,
2674    mut stmt: CreateMaterializedViewStatement<Aug>,
2675) -> Result<Plan, PlanError> {
2676    let cluster_id =
2677        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2678    stmt.in_cluster = Some(ResolvedClusterName {
2679        id: cluster_id,
2680        print_name: None,
2681    });
2682
2683    let create_sql =
2684        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2685
2686    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2687    let name = scx.allocate_qualified_name(partial_name.clone())?;
2688
2689    let query::PlannedRootQuery {
2690        expr,
2691        mut desc,
2692        finishing,
2693        scope: _,
2694    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2695    // We get back a trivial finishing, see comment in `plan_view`.
2696    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2697        &finishing,
2698        expr.arity()
2699    ));
2700    if expr.contains_parameters()? {
2701        return Err(PlanError::ParameterNotAllowed(
2702            "materialized views".to_string(),
2703        ));
2704    }
2705
2706    plan_utils::maybe_rename_columns(
2707        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2708        &mut desc,
2709        &stmt.columns,
2710    )?;
2711    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2712
2713    let MaterializedViewOptionExtracted {
2714        assert_not_null,
2715        partition_by,
2716        retain_history,
2717        refresh,
2718        seen: _,
2719    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2720
2721    if let Some(partition_by) = partition_by {
2722        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2723        check_partition_by(&desc, partition_by)?;
2724    }
2725
2726    let refresh_schedule = {
2727        let mut refresh_schedule = RefreshSchedule::default();
2728        let mut on_commits_seen = 0;
2729        for refresh_option_value in refresh {
2730            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2731                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2732            }
2733            match refresh_option_value {
2734                RefreshOptionValue::OnCommit => {
2735                    on_commits_seen += 1;
2736                }
2737                RefreshOptionValue::AtCreation => {
2738                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2739                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2740                }
2741                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2742                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2743                    let ecx = &ExprContext {
2744                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2745                        name: "REFRESH AT",
2746                        scope: &Scope::empty(),
2747                        relation_type: &RelationType::empty(),
2748                        allow_aggregates: false,
2749                        allow_subqueries: false,
2750                        allow_parameters: false,
2751                        allow_windows: false,
2752                    };
2753                    let hir = plan_expr(ecx, &time)?.cast_to(
2754                        ecx,
2755                        CastContext::Assignment,
2756                        &ScalarType::MzTimestamp,
2757                    )?;
2758                    // (mz_now was purified away to a literal earlier)
2759                    let timestamp = hir
2760                        .into_literal_mz_timestamp()
2761                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2762                    refresh_schedule.ats.push(timestamp);
2763                }
2764                RefreshOptionValue::Every(RefreshEveryOptionValue {
2765                    interval,
2766                    aligned_to,
2767                }) => {
2768                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2769                    if interval.as_microseconds() <= 0 {
2770                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2771                    }
2772                    if interval.months != 0 {
2773                        // This limitation is because we want Intervals to be cleanly convertable
2774                        // to a unix epoch timestamp difference. When the interval involves months, then
2775                        // this is not true anymore, because months have variable lengths.
2776                        // See `Timestamp::round_up`.
2777                        sql_bail!("REFRESH interval must not involve units larger than days");
2778                    }
2779                    let interval = interval.duration()?;
2780                    if u64::try_from(interval.as_millis()).is_err() {
2781                        sql_bail!("REFRESH interval too large");
2782                    }
2783
2784                    let mut aligned_to = match aligned_to {
2785                        Some(aligned_to) => aligned_to,
2786                        None => {
2787                            soft_panic_or_log!(
2788                                "ALIGNED TO should have been filled in by purification"
2789                            );
2790                            sql_bail!(
2791                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2792                            )
2793                        }
2794                    };
2795
2796                    // Desugar the `aligned_to` expression
2797                    transform_ast::transform(scx, &mut aligned_to)?;
2798
2799                    let ecx = &ExprContext {
2800                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2801                        name: "REFRESH EVERY ... ALIGNED TO",
2802                        scope: &Scope::empty(),
2803                        relation_type: &RelationType::empty(),
2804                        allow_aggregates: false,
2805                        allow_subqueries: false,
2806                        allow_parameters: false,
2807                        allow_windows: false,
2808                    };
2809                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2810                        ecx,
2811                        CastContext::Assignment,
2812                        &ScalarType::MzTimestamp,
2813                    )?;
2814                    // (mz_now was purified away to a literal earlier)
2815                    let aligned_to_const = aligned_to_hir
2816                        .into_literal_mz_timestamp()
2817                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2818
2819                    refresh_schedule.everies.push(RefreshEvery {
2820                        interval,
2821                        aligned_to: aligned_to_const,
2822                    });
2823                }
2824            }
2825        }
2826
2827        if on_commits_seen > 1 {
2828            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2829        }
2830        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2831            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2832        }
2833
2834        if refresh_schedule == RefreshSchedule::default() {
2835            None
2836        } else {
2837            Some(refresh_schedule)
2838        }
2839    };
2840
2841    let as_of = stmt.as_of.map(Timestamp::from);
2842    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2843    let mut non_null_assertions = assert_not_null
2844        .into_iter()
2845        .map(normalize::column_name)
2846        .map(|assertion_name| {
2847            column_names
2848                .iter()
2849                .position(|col| col == &assertion_name)
2850                .ok_or_else(|| {
2851                    sql_err!(
2852                        "column {} in ASSERT NOT NULL option not found",
2853                        assertion_name.quoted()
2854                    )
2855                })
2856        })
2857        .collect::<Result<Vec<_>, _>>()?;
2858    non_null_assertions.sort();
2859    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2860        let dup = &column_names[*dup];
2861        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2862    }
2863
2864    if let Some(dup) = column_names.iter().duplicates().next() {
2865        sql_bail!("column {} specified more than once", dup.quoted());
2866    }
2867
2868    // Override the statement-level IfExistsBehavior with Skip if this is
2869    // explicitly requested in the PlanContext (the default is `false`).
2870    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2871        Ok(true) => IfExistsBehavior::Skip,
2872        _ => stmt.if_exists,
2873    };
2874
2875    let mut replace = None;
2876    let mut if_not_exists = false;
2877    match if_exists {
2878        IfExistsBehavior::Replace => {
2879            let if_exists = true;
2880            let cascade = false;
2881            let replace_id = plan_drop_item(
2882                scx,
2883                ObjectType::MaterializedView,
2884                if_exists,
2885                partial_name.into(),
2886                cascade,
2887            )?;
2888
2889            // Check if the new Materialized View depends on the item that we would be replacing.
2890            if let Some(id) = replace_id {
2891                let dependencies = expr.depends_on();
2892                let invalid_drop = scx
2893                    .get_item(&id)
2894                    .global_ids()
2895                    .any(|gid| dependencies.contains(&gid));
2896                if invalid_drop {
2897                    let item = scx.catalog.get_item(&id);
2898                    sql_bail!(
2899                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2900                        scx.catalog.resolve_full_name(item.name())
2901                    );
2902                }
2903                replace = Some(id);
2904            }
2905        }
2906        IfExistsBehavior::Skip => if_not_exists = true,
2907        IfExistsBehavior::Error => (),
2908    }
2909    let drop_ids = replace
2910        .map(|id| {
2911            scx.catalog
2912                .item_dependents(id)
2913                .into_iter()
2914                .map(|id| id.unwrap_item_id())
2915                .collect()
2916        })
2917        .unwrap_or_default();
2918    let dependencies = expr
2919        .depends_on()
2920        .into_iter()
2921        .map(|gid| scx.catalog.resolve_item_id(&gid))
2922        .collect();
2923
2924    // Check for an object in the catalog with this same name
2925    let full_name = scx.catalog.resolve_full_name(&name);
2926    let partial_name = PartialItemName::from(full_name.clone());
2927    // For PostgreSQL compatibility, we need to prevent creating materialized
2928    // views when there is an existing object *or* type of the same name.
2929    if let (IfExistsBehavior::Error, Ok(item)) =
2930        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2931    {
2932        return Err(PlanError::ItemAlreadyExists {
2933            name: full_name.to_string(),
2934            item_type: item.item_type(),
2935        });
2936    }
2937
2938    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2939        name,
2940        materialized_view: MaterializedView {
2941            create_sql,
2942            expr,
2943            dependencies,
2944            column_names,
2945            cluster_id,
2946            non_null_assertions,
2947            compaction_window,
2948            refresh_schedule,
2949            as_of,
2950        },
2951        replace,
2952        drop_ids,
2953        if_not_exists,
2954        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2955    }))
2956}
2957
2958generate_extracted_config!(
2959    MaterializedViewOption,
2960    (AssertNotNull, Ident, AllowMultiple),
2961    (PartitionBy, Vec<Ident>),
2962    (RetainHistory, OptionalDuration),
2963    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2964);
2965
2966pub fn plan_create_continual_task(
2967    scx: &StatementContext,
2968    mut stmt: CreateContinualTaskStatement<Aug>,
2969) -> Result<Plan, PlanError> {
2970    match &stmt.sugar {
2971        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
2972        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
2973            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
2974        }
2975        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
2976            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
2977        }
2978    };
2979    let cluster_id = match &stmt.in_cluster {
2980        None => scx.catalog.resolve_cluster(None)?.id(),
2981        Some(in_cluster) => in_cluster.id,
2982    };
2983    stmt.in_cluster = Some(ResolvedClusterName {
2984        id: cluster_id,
2985        print_name: None,
2986    });
2987
2988    let create_sql =
2989        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
2990
2991    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
2992
2993    // It seems desirable for a CT that e.g. simply filters the input to keep
2994    // the same nullability. So, start by assuming all columns are non-nullable,
2995    // and then make them nullable below if any of the exprs plan them as
2996    // nullable.
2997    let mut desc = match stmt.columns {
2998        None => None,
2999        Some(columns) => {
3000            let mut desc_columns = Vec::with_capacity(columns.capacity());
3001            for col in columns.iter() {
3002                desc_columns.push((
3003                    normalize::column_name(col.name.clone()),
3004                    ColumnType {
3005                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3006                        nullable: false,
3007                    },
3008                ));
3009            }
3010            Some(RelationDesc::from_names_and_types(desc_columns))
3011        }
3012    };
3013    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3014    match input.item_type() {
3015        // Input must be a thing directly backed by a persist shard, so we can
3016        // use a persist listen to efficiently rehydrate.
3017        CatalogItemType::ContinualTask
3018        | CatalogItemType::Table
3019        | CatalogItemType::MaterializedView
3020        | CatalogItemType::Source => {}
3021        CatalogItemType::Sink
3022        | CatalogItemType::View
3023        | CatalogItemType::Index
3024        | CatalogItemType::Type
3025        | CatalogItemType::Func
3026        | CatalogItemType::Secret
3027        | CatalogItemType::Connection => {
3028            sql_bail!(
3029                "CONTINUAL TASK cannot use {} as an input",
3030                input.item_type()
3031            );
3032        }
3033    }
3034
3035    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3036    let ct_name = stmt.name;
3037    let placeholder_id = match &ct_name {
3038        ResolvedItemName::ContinualTask { id, name } => {
3039            let desc = match desc.as_ref().cloned() {
3040                Some(x) => x,
3041                None => {
3042                    // The user didn't specify the CT's columns. Take a wild
3043                    // guess that the CT has the same shape as the input. It's
3044                    // fine if this is wrong, we'll get an error below after
3045                    // planning the query.
3046                    let input_name = scx.catalog.resolve_full_name(input.name());
3047                    input.desc(&input_name)?.into_owned()
3048                }
3049            };
3050            qcx.ctes.insert(
3051                *id,
3052                CteDesc {
3053                    name: name.item.clone(),
3054                    desc,
3055                },
3056            );
3057            Some(*id)
3058        }
3059        _ => None,
3060    };
3061
3062    let mut exprs = Vec::new();
3063    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3064        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3065        let query::PlannedRootQuery {
3066            expr,
3067            desc: desc_query,
3068            finishing,
3069            scope: _,
3070        } = query::plan_ct_query(&mut qcx, query)?;
3071        // We get back a trivial finishing because we plan with a "maintained"
3072        // QueryLifetime, see comment in `plan_view`.
3073        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3074            &finishing,
3075            expr.arity()
3076        ));
3077        if expr.contains_parameters()? {
3078            if expr.contains_parameters()? {
3079                return Err(PlanError::ParameterNotAllowed(
3080                    "continual tasks".to_string(),
3081                ));
3082            }
3083        }
3084        let expr = match desc.as_mut() {
3085            None => {
3086                desc = Some(desc_query);
3087                expr
3088            }
3089            Some(desc) => {
3090                // We specify the columns for DELETE, so if any columns types don't
3091                // match, it's because it's an INSERT.
3092                if desc_query.arity() > desc.arity() {
3093                    sql_bail!(
3094                        "statement {}: INSERT has more expressions than target columns",
3095                        idx
3096                    );
3097                }
3098                if desc_query.arity() < desc.arity() {
3099                    sql_bail!(
3100                        "statement {}: INSERT has more target columns than expressions",
3101                        idx
3102                    );
3103                }
3104                // Ensure the types of the source query match the types of the target table,
3105                // installing assignment casts where necessary and possible.
3106                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3107                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3108                let expr = expr.map_err(|e| {
3109                    sql_err!(
3110                        "statement {}: column {} is of type {} but expression is of type {}",
3111                        idx,
3112                        desc.get_name(e.column).quoted(),
3113                        qcx.humanize_scalar_type(&e.target_type, false),
3114                        qcx.humanize_scalar_type(&e.source_type, false),
3115                    )
3116                })?;
3117
3118                // Update ct nullability as necessary. The `ne` above verified that the
3119                // types are the same len.
3120                let zip_types = || desc.iter_types().zip(desc_query.iter_types());
3121                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3122                if updated {
3123                    let new_types = zip_types().map(|(ct, q)| {
3124                        let mut ct = ct.clone();
3125                        if q.nullable {
3126                            ct.nullable = true;
3127                        }
3128                        ct
3129                    });
3130                    *desc = RelationDesc::from_names_and_types(
3131                        desc.iter_names().cloned().zip(new_types),
3132                    );
3133                }
3134
3135                expr
3136            }
3137        };
3138        match stmt {
3139            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3140            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3141        }
3142    }
3143    // TODO(ct3): Collect things by output and assert that there is only one (or
3144    // support multiple outputs).
3145    let expr = exprs
3146        .into_iter()
3147        .reduce(|acc, expr| acc.union(expr))
3148        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3149    let dependencies = expr
3150        .depends_on()
3151        .into_iter()
3152        .map(|gid| scx.catalog.resolve_item_id(&gid))
3153        .collect();
3154
3155    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3156    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3157    if let Some(dup) = column_names.iter().duplicates().next() {
3158        sql_bail!("column {} specified more than once", dup.quoted());
3159    }
3160
3161    // Check for an object in the catalog with this same name
3162    let name = match &ct_name {
3163        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3164        ResolvedItemName::ContinualTask { name, .. } => {
3165            let name = scx.allocate_qualified_name(name.clone())?;
3166            let full_name = scx.catalog.resolve_full_name(&name);
3167            let partial_name = PartialItemName::from(full_name.clone());
3168            // For PostgreSQL compatibility, we need to prevent creating this when there
3169            // is an existing object *or* type of the same name.
3170            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3171                return Err(PlanError::ItemAlreadyExists {
3172                    name: full_name.to_string(),
3173                    item_type: item.item_type(),
3174                });
3175            }
3176            name
3177        }
3178        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3179        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3180    };
3181
3182    let as_of = stmt.as_of.map(Timestamp::from);
3183    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3184        name,
3185        placeholder_id,
3186        desc,
3187        input_id: input.global_id(),
3188        with_snapshot: snapshot.unwrap_or(true),
3189        continual_task: MaterializedView {
3190            create_sql,
3191            expr,
3192            dependencies,
3193            column_names,
3194            cluster_id,
3195            non_null_assertions: Vec::new(),
3196            compaction_window: None,
3197            refresh_schedule: None,
3198            as_of,
3199        },
3200    }))
3201}
3202
3203fn continual_task_query<'a>(
3204    ct_name: &ResolvedItemName,
3205    stmt: &'a ast::ContinualTaskStmt<Aug>,
3206) -> Option<ast::Query<Aug>> {
3207    match stmt {
3208        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3209            table_name: _,
3210            columns,
3211            source,
3212            returning,
3213        }) => {
3214            if !columns.is_empty() || !returning.is_empty() {
3215                return None;
3216            }
3217            match source {
3218                ast::InsertSource::Query(query) => Some(query.clone()),
3219                ast::InsertSource::DefaultValues => None,
3220            }
3221        }
3222        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3223            table_name: _,
3224            alias,
3225            using,
3226            selection,
3227        }) => {
3228            if !using.is_empty() {
3229                return None;
3230            }
3231            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3232            let from = ast::TableWithJoins {
3233                relation: ast::TableFactor::Table {
3234                    name: ct_name.clone(),
3235                    alias: alias.clone(),
3236                },
3237                joins: Vec::new(),
3238            };
3239            let select = ast::Select {
3240                from: vec![from],
3241                selection: selection.clone(),
3242                distinct: None,
3243                projection: vec![ast::SelectItem::Wildcard],
3244                group_by: Vec::new(),
3245                having: None,
3246                qualify: None,
3247                options: Vec::new(),
3248            };
3249            let query = ast::Query {
3250                ctes: ast::CteBlock::Simple(Vec::new()),
3251                body: ast::SetExpr::Select(Box::new(select)),
3252                order_by: Vec::new(),
3253                limit: None,
3254                offset: None,
3255            };
3256            // Then negate it to turn it into retractions (after planning it).
3257            Some(query)
3258        }
3259    }
3260}
3261
3262generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3263
3264pub fn describe_create_sink(
3265    _: &StatementContext,
3266    _: CreateSinkStatement<Aug>,
3267) -> Result<StatementDesc, PlanError> {
3268    Ok(StatementDesc::new(None))
3269}
3270
3271generate_extracted_config!(
3272    CreateSinkOption,
3273    (Snapshot, bool),
3274    (PartitionStrategy, String),
3275    (Version, u64)
3276);
3277
3278pub fn plan_create_sink(
3279    scx: &StatementContext,
3280    stmt: CreateSinkStatement<Aug>,
3281) -> Result<Plan, PlanError> {
3282    // Check for an object in the catalog with this same name
3283    let Some(name) = stmt.name.clone() else {
3284        return Err(PlanError::MissingName(CatalogItemType::Sink));
3285    };
3286    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3287    let full_name = scx.catalog.resolve_full_name(&name);
3288    let partial_name = PartialItemName::from(full_name.clone());
3289    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3290        return Err(PlanError::ItemAlreadyExists {
3291            name: full_name.to_string(),
3292            item_type: item.item_type(),
3293        });
3294    }
3295
3296    plan_sink(scx, stmt)
3297}
3298
3299/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3300/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3301/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3302/// important.
3303fn plan_sink(
3304    scx: &StatementContext,
3305    mut stmt: CreateSinkStatement<Aug>,
3306) -> Result<Plan, PlanError> {
3307    let CreateSinkStatement {
3308        name,
3309        in_cluster: _,
3310        from,
3311        connection,
3312        format,
3313        envelope,
3314        if_not_exists,
3315        with_options,
3316    } = stmt.clone();
3317
3318    let Some(name) = name else {
3319        return Err(PlanError::MissingName(CatalogItemType::Sink));
3320    };
3321    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3322
3323    let envelope = match envelope {
3324        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3325        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3326        None => sql_bail!("ENVELOPE clause is required"),
3327    };
3328
3329    let from_name = &from;
3330    let from = scx.get_item_by_resolved_name(&from)?;
3331    if from.id().is_system() {
3332        bail_unsupported!("creating a sink directly on a catalog object");
3333    }
3334    let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3335    let key_indices = match &connection {
3336        CreateSinkConnection::Kafka { key, .. } => {
3337            if let Some(key) = key.clone() {
3338                let key_columns = key
3339                    .key_columns
3340                    .into_iter()
3341                    .map(normalize::column_name)
3342                    .collect::<Vec<_>>();
3343                let mut uniq = BTreeSet::new();
3344                for col in key_columns.iter() {
3345                    if !uniq.insert(col) {
3346                        sql_bail!("duplicate column referenced in KEY: {}", col);
3347                    }
3348                }
3349                let indices = key_columns
3350                    .iter()
3351                    .map(|col| -> anyhow::Result<usize> {
3352                        let name_idx =
3353                            desc.get_by_name(col)
3354                                .map(|(idx, _type)| idx)
3355                                .ok_or_else(|| {
3356                                    sql_err!("column referenced in KEY does not exist: {}", col)
3357                                })?;
3358                        if desc.get_unambiguous_name(name_idx).is_none() {
3359                            sql_err!("column referenced in KEY is ambiguous: {}", col);
3360                        }
3361                        Ok(name_idx)
3362                    })
3363                    .collect::<Result<Vec<_>, _>>()?;
3364                let is_valid_key =
3365                    desc.typ().keys.iter().any(|key_columns| {
3366                        key_columns.iter().all(|column| indices.contains(column))
3367                    });
3368
3369                if !is_valid_key && envelope == SinkEnvelope::Upsert {
3370                    if key.not_enforced {
3371                        scx.catalog
3372                            .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3373                                key: key_columns.clone(),
3374                                name: name.item.clone(),
3375                            })
3376                    } else {
3377                        return Err(PlanError::UpsertSinkWithInvalidKey {
3378                            name: from_name.full_name_str(),
3379                            desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3380                            valid_keys: desc
3381                                .typ()
3382                                .keys
3383                                .iter()
3384                                .map(|key| {
3385                                    key.iter()
3386                                        .map(|col| desc.get_name(*col).as_str().into())
3387                                        .collect()
3388                                })
3389                                .collect(),
3390                        });
3391                    }
3392                }
3393                Some(indices)
3394            } else {
3395                None
3396            }
3397        }
3398    };
3399
3400    let headers_index = match &connection {
3401        CreateSinkConnection::Kafka {
3402            headers: Some(headers),
3403            ..
3404        } => {
3405            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3406
3407            match envelope {
3408                SinkEnvelope::Upsert => (),
3409                SinkEnvelope::Debezium => {
3410                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3411                }
3412            };
3413
3414            let headers = normalize::column_name(headers.clone());
3415            let (idx, ty) = desc
3416                .get_by_name(&headers)
3417                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3418
3419            if desc.get_unambiguous_name(idx).is_none() {
3420                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3421            }
3422
3423            match &ty.scalar_type {
3424                ScalarType::Map { value_type, .. }
3425                    if matches!(&**value_type, ScalarType::String | ScalarType::Bytes) => {}
3426                _ => sql_bail!(
3427                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3428                ),
3429            }
3430
3431            Some(idx)
3432        }
3433        _ => None,
3434    };
3435
3436    // pick the first valid natural relation key, if any
3437    let relation_key_indices = desc.typ().keys.get(0).cloned();
3438
3439    let key_desc_and_indices = key_indices.map(|key_indices| {
3440        let cols = desc
3441            .iter()
3442            .map(|(name, ty)| (name.clone(), ty.clone()))
3443            .collect::<Vec<_>>();
3444        let (names, types): (Vec<_>, Vec<_>) =
3445            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3446        let typ = RelationType::new(types);
3447        (RelationDesc::new(typ, names), key_indices)
3448    });
3449
3450    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3451        return Err(PlanError::UpsertSinkWithoutKey);
3452    }
3453
3454    let connection_builder = match connection {
3455        CreateSinkConnection::Kafka {
3456            connection,
3457            options,
3458            ..
3459        } => kafka_sink_builder(
3460            scx,
3461            connection,
3462            options,
3463            format,
3464            relation_key_indices,
3465            key_desc_and_indices,
3466            headers_index,
3467            desc.into_owned(),
3468            envelope,
3469            from.id(),
3470        )?,
3471    };
3472
3473    let CreateSinkOptionExtracted {
3474        snapshot,
3475        version,
3476        partition_strategy: _,
3477        seen: _,
3478    } = with_options.try_into()?;
3479
3480    // WITH SNAPSHOT defaults to true
3481    let with_snapshot = snapshot.unwrap_or(true);
3482    // VERSION defaults to 0
3483    let version = version.unwrap_or(0);
3484
3485    // We will rewrite the cluster if one is not provided, so we must use the
3486    // `in_cluster` value we plan to normalize when we canonicalize the create
3487    // statement.
3488    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3489    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3490
3491    Ok(Plan::CreateSink(CreateSinkPlan {
3492        name,
3493        sink: Sink {
3494            create_sql,
3495            from: from.global_id(),
3496            connection: connection_builder,
3497            envelope,
3498            version,
3499        },
3500        with_snapshot,
3501        if_not_exists,
3502        in_cluster: in_cluster.id(),
3503    }))
3504}
3505
3506fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3507    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3508
3509    let existing_keys = desc
3510        .typ()
3511        .keys
3512        .iter()
3513        .map(|key_columns| {
3514            key_columns
3515                .iter()
3516                .map(|col| desc.get_name(*col).as_str())
3517                .join(", ")
3518        })
3519        .join(", ");
3520
3521    sql_err!(
3522        "Key constraint ({}) conflicts with existing key ({})",
3523        user_keys,
3524        existing_keys
3525    )
3526}
3527
3528/// Creating this by hand instead of using generate_extracted_config! macro
3529/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3530#[derive(Debug, Default, PartialEq, Clone)]
3531pub struct CsrConfigOptionExtracted {
3532    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3533    pub(crate) avro_key_fullname: Option<String>,
3534    pub(crate) avro_value_fullname: Option<String>,
3535    pub(crate) null_defaults: bool,
3536    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3537    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3538    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3539    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3540}
3541
3542impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3543    type Error = crate::plan::PlanError;
3544    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3545        let mut extracted = CsrConfigOptionExtracted::default();
3546        let mut common_doc_comments = BTreeMap::new();
3547        for option in v {
3548            if !extracted.seen.insert(option.name.clone()) {
3549                return Err(PlanError::Unstructured({
3550                    format!("{} specified more than once", option.name)
3551                }));
3552            }
3553            let option_name = option.name.clone();
3554            let option_name_str = option_name.to_ast_string_simple();
3555            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3556                option_name: option_name.to_ast_string_simple(),
3557                err: e.into(),
3558            };
3559            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3560                val.map(|s| match s {
3561                    WithOptionValue::Value(Value::String(s)) => {
3562                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3563                    }
3564                    _ => Err("must be a string".to_string()),
3565                })
3566                .transpose()
3567                .map_err(PlanError::Unstructured)
3568                .map_err(better_error)
3569            };
3570            match option.name {
3571                CsrConfigOptionName::AvroKeyFullname => {
3572                    extracted.avro_key_fullname =
3573                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3574                }
3575                CsrConfigOptionName::AvroValueFullname => {
3576                    extracted.avro_value_fullname =
3577                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3578                }
3579                CsrConfigOptionName::NullDefaults => {
3580                    extracted.null_defaults =
3581                        <bool>::try_from_value(option.value).map_err(better_error)?;
3582                }
3583                CsrConfigOptionName::AvroDocOn(doc_on) => {
3584                    let value = String::try_from_value(option.value.ok_or_else(|| {
3585                        PlanError::InvalidOptionValue {
3586                            option_name: option_name_str,
3587                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3588                        }
3589                    })?)
3590                    .map_err(better_error)?;
3591                    let key = match doc_on.identifier {
3592                        DocOnIdentifier::Column(ast::ColumnName {
3593                            relation: ResolvedItemName::Item { id, .. },
3594                            column: ResolvedColumnReference::Column { name, index: _ },
3595                        }) => DocTarget::Field {
3596                            object_id: id,
3597                            column_name: name,
3598                        },
3599                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3600                            DocTarget::Type(id)
3601                        }
3602                        _ => unreachable!(),
3603                    };
3604
3605                    match doc_on.for_schema {
3606                        DocOnSchema::KeyOnly => {
3607                            extracted.key_doc_options.insert(key, value);
3608                        }
3609                        DocOnSchema::ValueOnly => {
3610                            extracted.value_doc_options.insert(key, value);
3611                        }
3612                        DocOnSchema::All => {
3613                            common_doc_comments.insert(key, value);
3614                        }
3615                    }
3616                }
3617                CsrConfigOptionName::KeyCompatibilityLevel => {
3618                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3619                }
3620                CsrConfigOptionName::ValueCompatibilityLevel => {
3621                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3622                }
3623            }
3624        }
3625
3626        for (key, value) in common_doc_comments {
3627            if !extracted.key_doc_options.contains_key(&key) {
3628                extracted.key_doc_options.insert(key.clone(), value.clone());
3629            }
3630            if !extracted.value_doc_options.contains_key(&key) {
3631                extracted.value_doc_options.insert(key, value);
3632            }
3633        }
3634        Ok(extracted)
3635    }
3636}
3637
3638fn kafka_sink_builder(
3639    scx: &StatementContext,
3640    connection: ResolvedItemName,
3641    options: Vec<KafkaSinkConfigOption<Aug>>,
3642    format: Option<FormatSpecifier<Aug>>,
3643    relation_key_indices: Option<Vec<usize>>,
3644    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3645    headers_index: Option<usize>,
3646    value_desc: RelationDesc,
3647    envelope: SinkEnvelope,
3648    sink_from: CatalogItemId,
3649) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3650    // Get Kafka connection.
3651    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3652    let connection_id = connection_item.id();
3653    match connection_item.connection()? {
3654        Connection::Kafka(_) => (),
3655        _ => sql_bail!(
3656            "{} is not a kafka connection",
3657            scx.catalog.resolve_full_name(connection_item.name())
3658        ),
3659    };
3660
3661    let KafkaSinkConfigOptionExtracted {
3662        topic,
3663        compression_type,
3664        partition_by,
3665        progress_group_id_prefix,
3666        transactional_id_prefix,
3667        legacy_ids,
3668        topic_config,
3669        topic_metadata_refresh_interval,
3670        topic_partition_count,
3671        topic_replication_factor,
3672        seen: _,
3673    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3674
3675    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3676        (Some(_), Some(true)) => {
3677            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3678        }
3679        (None, Some(true)) => KafkaIdStyle::Legacy,
3680        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3681    };
3682
3683    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3684        (Some(_), Some(true)) => {
3685            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3686        }
3687        (None, Some(true)) => KafkaIdStyle::Legacy,
3688        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3689    };
3690
3691    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3692
3693    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3694        // This is a librdkafka-enforced restriction that, if violated,
3695        // would result in a runtime error for the source.
3696        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3697    }
3698
3699    let assert_positive = |val: Option<i32>, name: &str| {
3700        if let Some(val) = val {
3701            if val <= 0 {
3702                sql_bail!("{} must be a positive integer", name);
3703            }
3704        }
3705        val.map(NonNeg::try_from)
3706            .transpose()
3707            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3708    };
3709    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3710    let topic_replication_factor =
3711        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3712
3713    // Helper method to parse avro connection options for format specifiers that use avro
3714    // for either key or value encoding.
3715    let gen_avro_schema_options = |conn| {
3716        let CsrConnectionAvro {
3717            connection:
3718                CsrConnection {
3719                    connection,
3720                    options,
3721                },
3722            seed,
3723            key_strategy,
3724            value_strategy,
3725        } = conn;
3726        if seed.is_some() {
3727            sql_bail!("SEED option does not make sense with sinks");
3728        }
3729        if key_strategy.is_some() {
3730            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3731        }
3732        if value_strategy.is_some() {
3733            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3734        }
3735
3736        let item = scx.get_item_by_resolved_name(&connection)?;
3737        let csr_connection = match item.connection()? {
3738            Connection::Csr(_) => item.id(),
3739            _ => {
3740                sql_bail!(
3741                    "{} is not a schema registry connection",
3742                    scx.catalog
3743                        .resolve_full_name(item.name())
3744                        .to_string()
3745                        .quoted()
3746                )
3747            }
3748        };
3749        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3750
3751        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3752            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3753        }
3754
3755        if key_desc_and_indices.is_some()
3756            && (extracted_options.avro_key_fullname.is_some()
3757                ^ extracted_options.avro_value_fullname.is_some())
3758        {
3759            sql_bail!(
3760                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3761            );
3762        }
3763
3764        Ok((csr_connection, extracted_options))
3765    };
3766
3767    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3768        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3769        Format::Bytes if desc.arity() == 1 => {
3770            let col_type = &desc.typ().column_types[0].scalar_type;
3771            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3772                bail_unsupported!(format!(
3773                    "BYTES format with non-encodable type: {:?}",
3774                    col_type
3775                ));
3776            }
3777
3778            Ok(KafkaSinkFormatType::Bytes)
3779        }
3780        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3781        Format::Bytes | Format::Text => {
3782            bail_unsupported!("BYTES or TEXT format with multiple columns")
3783        }
3784        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3785        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3786            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3787            let schema = if is_key {
3788                AvroSchemaGenerator::new(
3789                    desc.clone(),
3790                    false,
3791                    options.key_doc_options,
3792                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3793                    options.null_defaults,
3794                    Some(sink_from),
3795                    false,
3796                )?
3797                .schema()
3798                .to_string()
3799            } else {
3800                AvroSchemaGenerator::new(
3801                    desc.clone(),
3802                    matches!(envelope, SinkEnvelope::Debezium),
3803                    options.value_doc_options,
3804                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3805                    options.null_defaults,
3806                    Some(sink_from),
3807                    true,
3808                )?
3809                .schema()
3810                .to_string()
3811            };
3812            Ok(KafkaSinkFormatType::Avro {
3813                schema,
3814                compatibility_level: if is_key {
3815                    options.key_compatibility_level
3816                } else {
3817                    options.value_compatibility_level
3818                },
3819                csr_connection,
3820            })
3821        }
3822        format => bail_unsupported!(format!("sink format {:?}", format)),
3823    };
3824
3825    let partition_by = match &partition_by {
3826        Some(partition_by) => {
3827            let mut scope = Scope::from_source(None, value_desc.iter_names());
3828
3829            match envelope {
3830                SinkEnvelope::Upsert => (),
3831                SinkEnvelope::Debezium => {
3832                    let key_indices: HashSet<_> = key_desc_and_indices
3833                        .as_ref()
3834                        .map(|(_desc, indices)| indices.as_slice())
3835                        .unwrap_or_default()
3836                        .into_iter()
3837                        .collect();
3838                    for (i, item) in scope.items.iter_mut().enumerate() {
3839                        if !key_indices.contains(&i) {
3840                            item.error_if_referenced = Some(|_table, column| {
3841                                PlanError::InvalidPartitionByEnvelopeDebezium {
3842                                    column_name: column.to_string(),
3843                                }
3844                            });
3845                        }
3846                    }
3847                }
3848            };
3849
3850            let ecx = &ExprContext {
3851                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3852                name: "PARTITION BY",
3853                scope: &scope,
3854                relation_type: value_desc.typ(),
3855                allow_aggregates: false,
3856                allow_subqueries: false,
3857                allow_parameters: false,
3858                allow_windows: false,
3859            };
3860            let expr = plan_expr(ecx, partition_by)?.cast_to(
3861                ecx,
3862                CastContext::Assignment,
3863                &ScalarType::UInt64,
3864            )?;
3865            let expr = expr.lower_uncorrelated()?;
3866
3867            Some(expr)
3868        }
3869        _ => None,
3870    };
3871
3872    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3873    let format = match format {
3874        Some(FormatSpecifier::KeyValue { key, value }) => {
3875            let key_format = match key_desc_and_indices.as_ref() {
3876                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3877                None => None,
3878            };
3879            KafkaSinkFormat {
3880                value_format: map_format(value, &value_desc, false)?,
3881                key_format,
3882            }
3883        }
3884        Some(FormatSpecifier::Bare(format)) => {
3885            let key_format = match key_desc_and_indices.as_ref() {
3886                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3887                None => None,
3888            };
3889            KafkaSinkFormat {
3890                value_format: map_format(format, &value_desc, false)?,
3891                key_format,
3892            }
3893        }
3894        None => bail_unsupported!("sink without format"),
3895    };
3896
3897    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3898        connection_id,
3899        connection: connection_id,
3900        format,
3901        topic: topic_name,
3902        relation_key_indices,
3903        key_desc_and_indices,
3904        headers_index,
3905        value_desc,
3906        partition_by,
3907        compression_type,
3908        progress_group_id,
3909        transactional_id,
3910        topic_options: KafkaTopicOptions {
3911            partition_count: topic_partition_count,
3912            replication_factor: topic_replication_factor,
3913            topic_config: topic_config.unwrap_or_default(),
3914        },
3915        topic_metadata_refresh_interval,
3916    }))
3917}
3918
3919pub fn describe_create_index(
3920    _: &StatementContext,
3921    _: CreateIndexStatement<Aug>,
3922) -> Result<StatementDesc, PlanError> {
3923    Ok(StatementDesc::new(None))
3924}
3925
3926pub fn plan_create_index(
3927    scx: &StatementContext,
3928    mut stmt: CreateIndexStatement<Aug>,
3929) -> Result<Plan, PlanError> {
3930    let CreateIndexStatement {
3931        name,
3932        on_name,
3933        in_cluster,
3934        key_parts,
3935        with_options,
3936        if_not_exists,
3937    } = &mut stmt;
3938    let on = scx.get_item_by_resolved_name(on_name)?;
3939    let on_item_type = on.item_type();
3940
3941    if !matches!(
3942        on_item_type,
3943        CatalogItemType::View
3944            | CatalogItemType::MaterializedView
3945            | CatalogItemType::Source
3946            | CatalogItemType::Table
3947    ) {
3948        sql_bail!(
3949            "index cannot be created on {} because it is a {}",
3950            on_name.full_name_str(),
3951            on.item_type()
3952        )
3953    }
3954
3955    let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3956
3957    let filled_key_parts = match key_parts {
3958        Some(kp) => kp.to_vec(),
3959        None => {
3960            // `key_parts` is None if we're creating a "default" index.
3961            on.desc(&scx.catalog.resolve_full_name(on.name()))?
3962                .typ()
3963                .default_key()
3964                .iter()
3965                .map(|i| match on_desc.get_unambiguous_name(*i) {
3966                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
3967                    _ => Expr::Value(Value::Number((i + 1).to_string())),
3968                })
3969                .collect()
3970        }
3971    };
3972    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
3973
3974    let index_name = if let Some(name) = name {
3975        QualifiedItemName {
3976            qualifiers: on.name().qualifiers.clone(),
3977            item: normalize::ident(name.clone()),
3978        }
3979    } else {
3980        let mut idx_name = QualifiedItemName {
3981            qualifiers: on.name().qualifiers.clone(),
3982            item: on.name().item.clone(),
3983        };
3984        if key_parts.is_none() {
3985            // We're trying to create the "default" index.
3986            idx_name.item += "_primary_idx";
3987        } else {
3988            // Use PG schema for automatically naming indexes:
3989            // `<table>_<_-separated indexed expressions>_idx`
3990            let index_name_col_suffix = keys
3991                .iter()
3992                .map(|k| match k {
3993                    mz_expr::MirScalarExpr::Column(i, name) => {
3994                        match (on_desc.get_unambiguous_name(*i), &name.0) {
3995                            (Some(col_name), _) => col_name.to_string(),
3996                            (None, Some(name)) => name.to_string(),
3997                            (None, None) => format!("{}", i + 1),
3998                        }
3999                    }
4000                    _ => "expr".to_string(),
4001                })
4002                .join("_");
4003            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4004                .expect("write on strings cannot fail");
4005            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4006        }
4007
4008        if !*if_not_exists {
4009            scx.catalog.find_available_name(idx_name)
4010        } else {
4011            idx_name
4012        }
4013    };
4014
4015    // Check for an object in the catalog with this same name
4016    let full_name = scx.catalog.resolve_full_name(&index_name);
4017    let partial_name = PartialItemName::from(full_name.clone());
4018    // For PostgreSQL compatibility, we need to prevent creating indexes when
4019    // there is an existing object *or* type of the same name.
4020    //
4021    // Technically, we only need to prevent coexistence of indexes and types
4022    // that have an associated relation (record types but not list/map types).
4023    // Enforcing that would be more complicated, though. It's backwards
4024    // compatible to weaken this restriction in the future.
4025    if let (Ok(item), false, false) = (
4026        scx.catalog.resolve_item_or_type(&partial_name),
4027        *if_not_exists,
4028        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4029    ) {
4030        return Err(PlanError::ItemAlreadyExists {
4031            name: full_name.to_string(),
4032            item_type: item.item_type(),
4033        });
4034    }
4035
4036    let options = plan_index_options(scx, with_options.clone())?;
4037    let cluster_id = match in_cluster {
4038        None => scx.resolve_cluster(None)?.id(),
4039        Some(in_cluster) => in_cluster.id,
4040    };
4041
4042    *in_cluster = Some(ResolvedClusterName {
4043        id: cluster_id,
4044        print_name: None,
4045    });
4046
4047    // Normalize `stmt`.
4048    *name = Some(Ident::new(index_name.item.clone())?);
4049    *key_parts = Some(filled_key_parts);
4050    let if_not_exists = *if_not_exists;
4051
4052    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4053    let compaction_window = options.iter().find_map(|o| {
4054        #[allow(irrefutable_let_patterns)]
4055        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4056            Some(lcw.clone())
4057        } else {
4058            None
4059        }
4060    });
4061
4062    Ok(Plan::CreateIndex(CreateIndexPlan {
4063        name: index_name,
4064        index: Index {
4065            create_sql,
4066            on: on.global_id(),
4067            keys,
4068            cluster_id,
4069            compaction_window,
4070        },
4071        if_not_exists,
4072    }))
4073}
4074
4075pub fn describe_create_type(
4076    _: &StatementContext,
4077    _: CreateTypeStatement<Aug>,
4078) -> Result<StatementDesc, PlanError> {
4079    Ok(StatementDesc::new(None))
4080}
4081
4082pub fn plan_create_type(
4083    scx: &StatementContext,
4084    stmt: CreateTypeStatement<Aug>,
4085) -> Result<Plan, PlanError> {
4086    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4087    let CreateTypeStatement { name, as_type, .. } = stmt;
4088
4089    fn validate_data_type(
4090        scx: &StatementContext,
4091        data_type: ResolvedDataType,
4092        as_type: &str,
4093        key: &str,
4094    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4095        let (id, modifiers) = match data_type {
4096            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4097            _ => sql_bail!(
4098                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4099                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4100                as_type,
4101                key,
4102                data_type.human_readable_name(),
4103            ),
4104        };
4105
4106        let item = scx.catalog.get_item(&id);
4107        match item.type_details() {
4108            None => sql_bail!(
4109                "{} must be of class type, but received {} which is of class {}",
4110                key,
4111                scx.catalog.resolve_full_name(item.name()),
4112                item.item_type()
4113            ),
4114            Some(CatalogTypeDetails {
4115                typ: CatalogType::Char,
4116                ..
4117            }) => {
4118                bail_unsupported!("embedding char type in a list or map")
4119            }
4120            _ => {
4121                // Validate that the modifiers are actually valid.
4122                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4123
4124                Ok((id, modifiers))
4125            }
4126        }
4127    }
4128
4129    let inner = match as_type {
4130        CreateTypeAs::List { options } => {
4131            let CreateTypeListOptionExtracted {
4132                element_type,
4133                seen: _,
4134            } = CreateTypeListOptionExtracted::try_from(options)?;
4135            let element_type =
4136                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4137            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4138            CatalogType::List {
4139                element_reference: id,
4140                element_modifiers: modifiers,
4141            }
4142        }
4143        CreateTypeAs::Map { options } => {
4144            let CreateTypeMapOptionExtracted {
4145                key_type,
4146                value_type,
4147                seen: _,
4148            } = CreateTypeMapOptionExtracted::try_from(options)?;
4149            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4150            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4151            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4152            let (value_id, value_modifiers) =
4153                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4154            CatalogType::Map {
4155                key_reference: key_id,
4156                key_modifiers,
4157                value_reference: value_id,
4158                value_modifiers,
4159            }
4160        }
4161        CreateTypeAs::Record { column_defs } => {
4162            let mut fields = vec![];
4163            for column_def in column_defs {
4164                let data_type = column_def.data_type;
4165                let key = ident(column_def.name.clone());
4166                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4167                fields.push(CatalogRecordField {
4168                    name: ColumnName::from(key.clone()),
4169                    type_reference: id,
4170                    type_modifiers: modifiers,
4171                });
4172            }
4173            CatalogType::Record { fields }
4174        }
4175    };
4176
4177    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4178
4179    // Check for an object in the catalog with this same name
4180    let full_name = scx.catalog.resolve_full_name(&name);
4181    let partial_name = PartialItemName::from(full_name.clone());
4182    // For PostgreSQL compatibility, we need to prevent creating types when
4183    // there is an existing object *or* type of the same name.
4184    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4185        if item.item_type().conflicts_with_type() {
4186            return Err(PlanError::ItemAlreadyExists {
4187                name: full_name.to_string(),
4188                item_type: item.item_type(),
4189            });
4190        }
4191    }
4192
4193    Ok(Plan::CreateType(CreateTypePlan {
4194        name,
4195        typ: Type { create_sql, inner },
4196    }))
4197}
4198
4199generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4200
4201generate_extracted_config!(
4202    CreateTypeMapOption,
4203    (KeyType, ResolvedDataType),
4204    (ValueType, ResolvedDataType)
4205);
4206
4207#[derive(Debug)]
4208pub enum PlannedAlterRoleOption {
4209    Attributes(PlannedRoleAttributes),
4210    Variable(PlannedRoleVariable),
4211}
4212
4213#[derive(Debug, Clone)]
4214pub struct PlannedRoleAttributes {
4215    pub inherit: Option<bool>,
4216    pub password: Option<Password>,
4217    /// `nopassword` is set to true if the password is from the parser is None.
4218    /// This is semantically different than not supplying a password at all,
4219    /// to allow for unsetting a password.
4220    pub nopassword: Option<bool>,
4221    pub superuser: Option<bool>,
4222    pub login: Option<bool>,
4223}
4224
4225fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4226    let mut planned_attributes = PlannedRoleAttributes {
4227        inherit: None,
4228        password: None,
4229        superuser: None,
4230        login: None,
4231        nopassword: None,
4232    };
4233
4234    for option in options {
4235        match option {
4236            RoleAttribute::Inherit | RoleAttribute::NoInherit
4237                if planned_attributes.inherit.is_some() =>
4238            {
4239                sql_bail!("conflicting or redundant options");
4240            }
4241            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4242                bail_never_supported!(
4243                    "CREATECLUSTER attribute",
4244                    "sql/create-role/#details",
4245                    "Use system privileges instead."
4246                );
4247            }
4248            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4249                bail_never_supported!(
4250                    "CREATEDB attribute",
4251                    "sql/create-role/#details",
4252                    "Use system privileges instead."
4253                );
4254            }
4255            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4256                bail_never_supported!(
4257                    "CREATEROLE attribute",
4258                    "sql/create-role/#details",
4259                    "Use system privileges instead."
4260                );
4261            }
4262            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4263                sql_bail!("conflicting or redundant options");
4264            }
4265
4266            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4267            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4268            RoleAttribute::Password(password) => {
4269                if let Some(password) = password {
4270                    planned_attributes.password = Some(password.into());
4271                } else {
4272                    planned_attributes.nopassword = Some(true);
4273                }
4274            }
4275            RoleAttribute::SuperUser => {
4276                if planned_attributes.superuser == Some(false) {
4277                    sql_bail!("conflicting or redundant options");
4278                }
4279                planned_attributes.superuser = Some(true);
4280            }
4281            RoleAttribute::NoSuperUser => {
4282                if planned_attributes.superuser == Some(true) {
4283                    sql_bail!("conflicting or redundant options");
4284                }
4285                planned_attributes.superuser = Some(false);
4286            }
4287            RoleAttribute::Login => {
4288                if planned_attributes.login == Some(false) {
4289                    sql_bail!("conflicting or redundant options");
4290                }
4291                planned_attributes.login = Some(true);
4292            }
4293            RoleAttribute::NoLogin => {
4294                if planned_attributes.login == Some(true) {
4295                    sql_bail!("conflicting or redundant options");
4296                }
4297                planned_attributes.login = Some(false);
4298            }
4299        }
4300    }
4301    if planned_attributes.inherit == Some(false) {
4302        bail_unsupported!("non inherit roles");
4303    }
4304
4305    Ok(planned_attributes)
4306}
4307
4308#[derive(Debug)]
4309pub enum PlannedRoleVariable {
4310    Set { name: String, value: VariableValue },
4311    Reset { name: String },
4312}
4313
4314impl PlannedRoleVariable {
4315    pub fn name(&self) -> &str {
4316        match self {
4317            PlannedRoleVariable::Set { name, .. } => name,
4318            PlannedRoleVariable::Reset { name } => name,
4319        }
4320    }
4321}
4322
4323fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4324    let plan = match variable {
4325        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4326            name: name.to_string(),
4327            value: scl::plan_set_variable_to(value)?,
4328        },
4329        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4330            name: name.to_string(),
4331        },
4332    };
4333    Ok(plan)
4334}
4335
4336pub fn describe_create_role(
4337    _: &StatementContext,
4338    _: CreateRoleStatement,
4339) -> Result<StatementDesc, PlanError> {
4340    Ok(StatementDesc::new(None))
4341}
4342
4343pub fn plan_create_role(
4344    _: &StatementContext,
4345    CreateRoleStatement { name, options }: CreateRoleStatement,
4346) -> Result<Plan, PlanError> {
4347    let attributes = plan_role_attributes(options)?;
4348    Ok(Plan::CreateRole(CreateRolePlan {
4349        name: normalize::ident(name),
4350        attributes: attributes.into(),
4351    }))
4352}
4353
4354pub fn plan_create_network_policy(
4355    ctx: &StatementContext,
4356    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4357) -> Result<Plan, PlanError> {
4358    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4359    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4360
4361    let Some(rule_defs) = policy_options.rules else {
4362        sql_bail!("RULES must be specified when creating network policies.");
4363    };
4364
4365    let mut rules = vec![];
4366    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4367        let NetworkPolicyRuleOptionExtracted {
4368            seen: _,
4369            direction,
4370            action,
4371            address,
4372        } = options.try_into()?;
4373        let (direction, action, address) = match (direction, action, address) {
4374            (Some(direction), Some(action), Some(address)) => (
4375                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4376                NetworkPolicyRuleAction::try_from(action.as_str())?,
4377                PolicyAddress::try_from(address.as_str())?,
4378            ),
4379            (_, _, _) => {
4380                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4381            }
4382        };
4383        rules.push(NetworkPolicyRule {
4384            name: normalize::ident(name),
4385            direction,
4386            action,
4387            address,
4388        });
4389    }
4390
4391    if rules.len()
4392        > ctx
4393            .catalog
4394            .system_vars()
4395            .max_rules_per_network_policy()
4396            .try_into()?
4397    {
4398        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4399    }
4400
4401    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4402        name: normalize::ident(name),
4403        rules,
4404    }))
4405}
4406
4407pub fn plan_alter_network_policy(
4408    ctx: &StatementContext,
4409    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4410) -> Result<Plan, PlanError> {
4411    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4412
4413    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4414    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4415
4416    let Some(rule_defs) = policy_options.rules else {
4417        sql_bail!("RULES must be specified when creating network policies.");
4418    };
4419
4420    let mut rules = vec![];
4421    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4422        let NetworkPolicyRuleOptionExtracted {
4423            seen: _,
4424            direction,
4425            action,
4426            address,
4427        } = options.try_into()?;
4428
4429        let (direction, action, address) = match (direction, action, address) {
4430            (Some(direction), Some(action), Some(address)) => (
4431                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4432                NetworkPolicyRuleAction::try_from(action.as_str())?,
4433                PolicyAddress::try_from(address.as_str())?,
4434            ),
4435            (_, _, _) => {
4436                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4437            }
4438        };
4439        rules.push(NetworkPolicyRule {
4440            name: normalize::ident(name),
4441            direction,
4442            action,
4443            address,
4444        });
4445    }
4446    if rules.len()
4447        > ctx
4448            .catalog
4449            .system_vars()
4450            .max_rules_per_network_policy()
4451            .try_into()?
4452    {
4453        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4454    }
4455
4456    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4457        id: policy.id(),
4458        name: normalize::ident(name),
4459        rules,
4460    }))
4461}
4462
4463pub fn describe_create_cluster(
4464    _: &StatementContext,
4465    _: CreateClusterStatement<Aug>,
4466) -> Result<StatementDesc, PlanError> {
4467    Ok(StatementDesc::new(None))
4468}
4469
4470// WARNING:
4471// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4472// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4473// that option stays the same. If you were to give a default value here, then not giving that option
4474// to ALTER CLUSTER would always reset the value of that option to the default.
4475generate_extracted_config!(
4476    ClusterOption,
4477    (AvailabilityZones, Vec<String>),
4478    (Disk, bool),
4479    (IntrospectionDebugging, bool),
4480    (IntrospectionInterval, OptionalDuration),
4481    (Managed, bool),
4482    (Replicas, Vec<ReplicaDefinition<Aug>>),
4483    (ReplicationFactor, u32),
4484    (Size, String),
4485    (Schedule, ClusterScheduleOptionValue),
4486    (WorkloadClass, OptionalString)
4487);
4488
4489generate_extracted_config!(
4490    NetworkPolicyOption,
4491    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4492);
4493
4494generate_extracted_config!(
4495    NetworkPolicyRuleOption,
4496    (Direction, String),
4497    (Action, String),
4498    (Address, String)
4499);
4500
4501generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4502
4503generate_extracted_config!(
4504    ClusterAlterUntilReadyOption,
4505    (Timeout, Duration),
4506    (OnTimeout, String)
4507);
4508
4509generate_extracted_config!(
4510    ClusterFeature,
4511    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4512    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4513    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4514    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4515    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4516    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4517    (
4518        EnableProjectionPushdownAfterRelationCse,
4519        Option<bool>,
4520        Default(None)
4521    )
4522);
4523
4524/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4525///
4526/// The reverse of [`unplan_create_cluster`].
4527pub fn plan_create_cluster(
4528    scx: &StatementContext,
4529    stmt: CreateClusterStatement<Aug>,
4530) -> Result<Plan, PlanError> {
4531    let plan = plan_create_cluster_inner(scx, stmt)?;
4532
4533    // Roundtrip through unplan and make sure that we end up with the same plan.
4534    if let CreateClusterVariant::Managed(_) = &plan.variant {
4535        let stmt = unplan_create_cluster(scx, plan.clone())
4536            .map_err(|e| PlanError::Replan(e.to_string()))?;
4537        let create_sql = stmt.to_ast_string_stable();
4538        let stmt = parse::parse(&create_sql)
4539            .map_err(|e| PlanError::Replan(e.to_string()))?
4540            .into_element()
4541            .ast;
4542        let (stmt, _resolved_ids) =
4543            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4544        let stmt = match stmt {
4545            Statement::CreateCluster(stmt) => stmt,
4546            stmt => {
4547                return Err(PlanError::Replan(format!(
4548                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4549                )));
4550            }
4551        };
4552        let replan =
4553            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4554        if plan != replan {
4555            return Err(PlanError::Replan(format!(
4556                "replan does not match: plan={plan:?}, replan={replan:?}"
4557            )));
4558        }
4559    }
4560
4561    Ok(Plan::CreateCluster(plan))
4562}
4563
4564pub fn plan_create_cluster_inner(
4565    scx: &StatementContext,
4566    CreateClusterStatement {
4567        name,
4568        options,
4569        features,
4570    }: CreateClusterStatement<Aug>,
4571) -> Result<CreateClusterPlan, PlanError> {
4572    let ClusterOptionExtracted {
4573        availability_zones,
4574        introspection_debugging,
4575        introspection_interval,
4576        managed,
4577        replicas,
4578        replication_factor,
4579        seen: _,
4580        size,
4581        disk: disk_in,
4582        schedule,
4583        workload_class,
4584    }: ClusterOptionExtracted = options.try_into()?;
4585
4586    let managed = managed.unwrap_or_else(|| replicas.is_none());
4587
4588    if !scx.catalog.active_role_id().is_system() {
4589        if !features.is_empty() {
4590            sql_bail!("FEATURES not supported for non-system users");
4591        }
4592        if workload_class.is_some() {
4593            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4594        }
4595    }
4596
4597    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4598    let workload_class = workload_class.and_then(|v| v.0);
4599
4600    if managed {
4601        if replicas.is_some() {
4602            sql_bail!("REPLICAS not supported for managed clusters");
4603        }
4604        let Some(size) = size else {
4605            sql_bail!("SIZE must be specified for managed clusters");
4606        };
4607
4608        let mut disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4609        // HACK(benesch): disk is always enabled for v2 cluster sizes, and it
4610        // is an error to specify `DISK = FALSE` or `DISK = TRUE` explicitly.
4611        //
4612        // The long term plan is to phase out the v1 cluster sizes, at which
4613        // point we'll be able to remove the `DISK` option entirely and simply
4614        // always enable disk.
4615        if scx.catalog.is_cluster_size_cc(&size) {
4616            if disk_in == Some(false) {
4617                sql_bail!(
4618                    "DISK option disabled is not supported for non-legacy cluster sizes because disk is always enabled"
4619                );
4620            }
4621            disk_default = true;
4622        }
4623        // Only require the feature flag if `DISK` was explicitly specified and it does not match
4624        // the default value.
4625        if matches!(disk_in, Some(disk) if disk != disk_default) {
4626            scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4627        }
4628        let disk = disk_in.unwrap_or(disk_default);
4629
4630        let compute = plan_compute_replica_config(
4631            introspection_interval,
4632            introspection_debugging.unwrap_or(false),
4633        )?;
4634
4635        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4636            replication_factor.unwrap_or_else(|| {
4637                scx.catalog
4638                    .system_vars()
4639                    .default_cluster_replication_factor()
4640            })
4641        } else {
4642            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4643            if replication_factor.is_some() {
4644                sql_bail!(
4645                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4646                );
4647            }
4648            // If we have a non-trivial schedule, then let's not have any replicas initially,
4649            // to avoid quickly going back and forth if the schedule doesn't want a replica
4650            // initially.
4651            0
4652        };
4653        let availability_zones = availability_zones.unwrap_or_default();
4654
4655        if !availability_zones.is_empty() {
4656            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4657        }
4658
4659        // Plan OptimizerFeatureOverrides.
4660        let ClusterFeatureExtracted {
4661            reoptimize_imported_views,
4662            enable_eager_delta_joins,
4663            enable_new_outer_join_lowering,
4664            enable_variadic_left_join_lowering,
4665            enable_letrec_fixpoint_analysis,
4666            enable_join_prioritize_arranged,
4667            enable_projection_pushdown_after_relation_cse,
4668            seen: _,
4669        } = ClusterFeatureExtracted::try_from(features)?;
4670        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4671            reoptimize_imported_views,
4672            enable_eager_delta_joins,
4673            enable_new_outer_join_lowering,
4674            enable_variadic_left_join_lowering,
4675            enable_letrec_fixpoint_analysis,
4676            enable_join_prioritize_arranged,
4677            enable_projection_pushdown_after_relation_cse,
4678            ..Default::default()
4679        };
4680
4681        let schedule = plan_cluster_schedule(schedule)?;
4682
4683        Ok(CreateClusterPlan {
4684            name: normalize::ident(name),
4685            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4686                replication_factor,
4687                size,
4688                availability_zones,
4689                compute,
4690                disk,
4691                optimizer_feature_overrides,
4692                schedule,
4693            }),
4694            workload_class,
4695        })
4696    } else {
4697        let Some(replica_defs) = replicas else {
4698            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4699        };
4700        if availability_zones.is_some() {
4701            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4702        }
4703        if replication_factor.is_some() {
4704            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4705        }
4706        if introspection_debugging.is_some() {
4707            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4708        }
4709        if introspection_interval.is_some() {
4710            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4711        }
4712        if size.is_some() {
4713            sql_bail!("SIZE not supported for unmanaged clusters");
4714        }
4715        if disk_in.is_some() {
4716            sql_bail!("DISK not supported for unmanaged clusters");
4717        }
4718        if !features.is_empty() {
4719            sql_bail!("FEATURES not supported for unmanaged clusters");
4720        }
4721        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4722            sql_bail!(
4723                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4724            );
4725        }
4726
4727        let mut replicas = vec![];
4728        for ReplicaDefinition { name, options } in replica_defs {
4729            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4730        }
4731
4732        Ok(CreateClusterPlan {
4733            name: normalize::ident(name),
4734            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4735            workload_class,
4736        })
4737    }
4738}
4739
4740/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4741///
4742/// The reverse of [`plan_create_cluster`].
4743pub fn unplan_create_cluster(
4744    scx: &StatementContext,
4745    CreateClusterPlan {
4746        name,
4747        variant,
4748        workload_class,
4749    }: CreateClusterPlan,
4750) -> Result<CreateClusterStatement<Aug>, PlanError> {
4751    match variant {
4752        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4753            replication_factor,
4754            size,
4755            availability_zones,
4756            compute,
4757            disk,
4758            optimizer_feature_overrides,
4759            schedule,
4760        }) => {
4761            let schedule = unplan_cluster_schedule(schedule);
4762            let OptimizerFeatureOverrides {
4763                enable_consolidate_after_union_negate: _,
4764                enable_reduce_mfp_fusion: _,
4765                enable_cardinality_estimates: _,
4766                persist_fast_path_limit: _,
4767                reoptimize_imported_views,
4768                enable_eager_delta_joins,
4769                enable_new_outer_join_lowering,
4770                enable_variadic_left_join_lowering,
4771                enable_letrec_fixpoint_analysis,
4772                enable_reduce_reduction: _,
4773                enable_join_prioritize_arranged,
4774                enable_projection_pushdown_after_relation_cse,
4775                enable_less_reduce_in_eqprop: _,
4776                enable_dequadratic_eqprop_map: _,
4777            } = optimizer_feature_overrides;
4778            // The ones from above that don't occur below are not wired up to cluster features.
4779            let features_extracted = ClusterFeatureExtracted {
4780                // Seen is ignored when unplanning.
4781                seen: Default::default(),
4782                reoptimize_imported_views,
4783                enable_eager_delta_joins,
4784                enable_new_outer_join_lowering,
4785                enable_variadic_left_join_lowering,
4786                enable_letrec_fixpoint_analysis,
4787                enable_join_prioritize_arranged,
4788                enable_projection_pushdown_after_relation_cse,
4789            };
4790            let features = features_extracted.into_values(scx.catalog);
4791            let availability_zones = if availability_zones.is_empty() {
4792                None
4793            } else {
4794                Some(availability_zones)
4795            };
4796            let (introspection_interval, introspection_debugging) =
4797                unplan_compute_replica_config(compute);
4798            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4799            // always 1 or less.
4800            let replication_factor = match &schedule {
4801                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4802                ClusterScheduleOptionValue::Refresh { .. } => {
4803                    assert!(
4804                        replication_factor <= 1,
4805                        "replication factor, {replication_factor:?}, must be <= 1"
4806                    );
4807                    None
4808                }
4809            };
4810            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4811            let options_extracted = ClusterOptionExtracted {
4812                // Seen is ignored when unplanning.
4813                seen: Default::default(),
4814                availability_zones,
4815                disk: Some(disk),
4816                introspection_debugging: Some(introspection_debugging),
4817                introspection_interval,
4818                managed: Some(true),
4819                replicas: None,
4820                replication_factor,
4821                size: Some(size),
4822                schedule: Some(schedule),
4823                workload_class,
4824            };
4825            let options = options_extracted.into_values(scx.catalog);
4826            let name = Ident::new_unchecked(name);
4827            Ok(CreateClusterStatement {
4828                name,
4829                options,
4830                features,
4831            })
4832        }
4833        CreateClusterVariant::Unmanaged(_) => {
4834            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4835        }
4836    }
4837}
4838
4839generate_extracted_config!(
4840    ReplicaOption,
4841    (AvailabilityZone, String),
4842    (BilledAs, String),
4843    (ComputeAddresses, Vec<String>),
4844    (ComputectlAddresses, Vec<String>),
4845    (Disk, bool),
4846    (Internal, bool, Default(false)),
4847    (IntrospectionDebugging, bool, Default(false)),
4848    (IntrospectionInterval, OptionalDuration),
4849    (Size, String),
4850    (StorageAddresses, Vec<String>),
4851    (StoragectlAddresses, Vec<String>),
4852    (Workers, u16)
4853);
4854
4855fn plan_replica_config(
4856    scx: &StatementContext,
4857    options: Vec<ReplicaOption<Aug>>,
4858) -> Result<ReplicaConfig, PlanError> {
4859    let ReplicaOptionExtracted {
4860        availability_zone,
4861        billed_as,
4862        compute_addresses,
4863        computectl_addresses,
4864        disk: disk_in,
4865        internal,
4866        introspection_debugging,
4867        introspection_interval,
4868        size,
4869        storage_addresses,
4870        storagectl_addresses,
4871        workers,
4872        ..
4873    }: ReplicaOptionExtracted = options.try_into()?;
4874
4875    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4876
4877    if disk_in.is_some() {
4878        scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4879    }
4880
4881    match (
4882        size,
4883        availability_zone,
4884        billed_as,
4885        storagectl_addresses,
4886        storage_addresses,
4887        computectl_addresses,
4888        compute_addresses,
4889        workers,
4890    ) {
4891        // Common cases we expect end users to hit.
4892        (None, _, None, None, None, None, None, None) => {
4893            // We don't mention the unmanaged options in the error message
4894            // because they are only available in unsafe mode.
4895            sql_bail!("SIZE option must be specified");
4896        }
4897        (Some(size), availability_zone, billed_as, None, None, None, None, None) => {
4898            let disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4899            let mut disk = disk_in.unwrap_or(disk_default);
4900
4901            // HACK(benesch): disk is always enabled for v2 cluster sizes, and
4902            // it is an error to specify `DISK = FALSE` or `DISK = TRUE`
4903            // explicitly.
4904            //
4905            // The long term plan is to phase out the v1 cluster sizes, at which
4906            // point we'll be able to remove the `DISK` option entirely and
4907            // simply always enable disk.
4908            if scx.catalog.is_cluster_size_cc(&size) {
4909                if disk_in.is_some() {
4910                    sql_bail!(
4911                        "DISK option not supported for non-legacy cluster sizes because disk is always enabled"
4912                    );
4913                }
4914                disk = true;
4915            }
4916
4917            Ok(ReplicaConfig::Orchestrated {
4918                size,
4919                availability_zone,
4920                compute,
4921                disk,
4922                billed_as,
4923                internal,
4924            })
4925        }
4926
4927        (
4928            None,
4929            None,
4930            None,
4931            storagectl_addresses,
4932            storage_addresses,
4933            computectl_addresses,
4934            compute_addresses,
4935            workers,
4936        ) => {
4937            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4938
4939            // When manually testing Materialize in unsafe mode, it's easy to
4940            // accidentally omit one of these options, so we try to produce
4941            // helpful error messages.
4942            let Some(storagectl_addrs) = storagectl_addresses else {
4943                sql_bail!("missing STORAGECTL ADDRESSES option");
4944            };
4945            let Some(storage_addrs) = storage_addresses else {
4946                sql_bail!("missing STORAGE ADDRESSES option");
4947            };
4948            let Some(computectl_addrs) = computectl_addresses else {
4949                sql_bail!("missing COMPUTECTL ADDRESSES option");
4950            };
4951            let Some(compute_addrs) = compute_addresses else {
4952                sql_bail!("missing COMPUTE ADDRESSES option");
4953            };
4954            let workers = workers.unwrap_or(1);
4955
4956            if computectl_addrs.len() != compute_addrs.len() {
4957                sql_bail!("COMPUTECTL ADDRESSES and COMPUTE ADDRESSES must have the same length");
4958            }
4959            if storagectl_addrs.len() != storage_addrs.len() {
4960                sql_bail!("STORAGECTL ADDRESSES and STORAGE ADDRESSES must have the same length");
4961            }
4962            if storagectl_addrs.len() != computectl_addrs.len() {
4963                sql_bail!(
4964                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4965                );
4966            }
4967
4968            if workers == 0 {
4969                sql_bail!("WORKERS must be greater than 0");
4970            }
4971
4972            if disk_in.is_some() {
4973                sql_bail!("DISK can't be specified for unorchestrated clusters");
4974            }
4975
4976            Ok(ReplicaConfig::Unorchestrated {
4977                storagectl_addrs,
4978                storage_addrs,
4979                computectl_addrs,
4980                compute_addrs,
4981                workers: workers.into(),
4982                compute,
4983            })
4984        }
4985        _ => {
4986            // We don't bother trying to produce a more helpful error message
4987            // here because no user is likely to hit this path.
4988            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4989        }
4990    }
4991}
4992
4993/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
4994///
4995/// The reverse of [`unplan_compute_replica_config`].
4996fn plan_compute_replica_config(
4997    introspection_interval: Option<OptionalDuration>,
4998    introspection_debugging: bool,
4999) -> Result<ComputeReplicaConfig, PlanError> {
5000    let introspection_interval = introspection_interval
5001        .map(|OptionalDuration(i)| i)
5002        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5003    let introspection = match introspection_interval {
5004        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5005            interval,
5006            debugging: introspection_debugging,
5007        }),
5008        None if introspection_debugging => {
5009            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5010        }
5011        None => None,
5012    };
5013    let compute = ComputeReplicaConfig { introspection };
5014    Ok(compute)
5015}
5016
5017/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5018///
5019/// The reverse of [`plan_compute_replica_config`].
5020fn unplan_compute_replica_config(
5021    compute_replica_config: ComputeReplicaConfig,
5022) -> (Option<OptionalDuration>, bool) {
5023    match compute_replica_config.introspection {
5024        Some(ComputeReplicaIntrospectionConfig {
5025            debugging,
5026            interval,
5027        }) => (Some(OptionalDuration(Some(interval))), debugging),
5028        None => (Some(OptionalDuration(None)), false),
5029    }
5030}
5031
5032/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5033///
5034/// The reverse of [`unplan_cluster_schedule`].
5035fn plan_cluster_schedule(
5036    schedule: ClusterScheduleOptionValue,
5037) -> Result<ClusterSchedule, PlanError> {
5038    Ok(match schedule {
5039        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5040        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5041        ClusterScheduleOptionValue::Refresh {
5042            hydration_time_estimate: None,
5043        } => ClusterSchedule::Refresh {
5044            hydration_time_estimate: Duration::from_millis(0),
5045        },
5046        // Otherwise we convert the `IntervalValue` to a `Duration`.
5047        ClusterScheduleOptionValue::Refresh {
5048            hydration_time_estimate: Some(interval_value),
5049        } => {
5050            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5051            if interval.as_microseconds() < 0 {
5052                sql_bail!(
5053                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5054                    interval
5055                );
5056            }
5057            if interval.months != 0 {
5058                // This limitation is because we want this interval to be cleanly convertable
5059                // to a unix epoch timestamp difference. When the interval involves months, then
5060                // this is not true anymore, because months have variable lengths.
5061                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5062            }
5063            let duration = interval.duration()?;
5064            if u64::try_from(duration.as_millis()).is_err()
5065                || Interval::from_duration(&duration).is_err()
5066            {
5067                sql_bail!("HYDRATION TIME ESTIMATE too large");
5068            }
5069            ClusterSchedule::Refresh {
5070                hydration_time_estimate: duration,
5071            }
5072        }
5073    })
5074}
5075
5076/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5077///
5078/// The reverse of [`plan_cluster_schedule`].
5079fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5080    match schedule {
5081        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5082        ClusterSchedule::Refresh {
5083            hydration_time_estimate,
5084        } => {
5085            let interval = Interval::from_duration(&hydration_time_estimate)
5086                .expect("planning ensured that this is convertible back to Interval");
5087            let interval_value = literal::unplan_interval(&interval);
5088            ClusterScheduleOptionValue::Refresh {
5089                hydration_time_estimate: Some(interval_value),
5090            }
5091        }
5092    }
5093}
5094
5095pub fn describe_create_cluster_replica(
5096    _: &StatementContext,
5097    _: CreateClusterReplicaStatement<Aug>,
5098) -> Result<StatementDesc, PlanError> {
5099    Ok(StatementDesc::new(None))
5100}
5101
5102pub fn plan_create_cluster_replica(
5103    scx: &StatementContext,
5104    CreateClusterReplicaStatement {
5105        definition: ReplicaDefinition { name, options },
5106        of_cluster,
5107    }: CreateClusterReplicaStatement<Aug>,
5108) -> Result<Plan, PlanError> {
5109    let cluster = scx
5110        .catalog
5111        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5112    let current_replica_count = cluster.replica_ids().iter().count();
5113    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5114        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5115        return Err(PlanError::CreateReplicaFailStorageObjects {
5116            current_replica_count,
5117            internal_replica_count,
5118            hypothetical_replica_count: current_replica_count + 1,
5119        });
5120    }
5121
5122    let config = plan_replica_config(scx, options)?;
5123
5124    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5125        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5126            return Err(PlanError::MangedReplicaName(name.into_string()));
5127        }
5128    } else {
5129        ensure_cluster_is_not_managed(scx, cluster.id())?;
5130    }
5131
5132    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5133        name: normalize::ident(name),
5134        cluster_id: cluster.id(),
5135        config,
5136    }))
5137}
5138
5139pub fn describe_create_secret(
5140    _: &StatementContext,
5141    _: CreateSecretStatement<Aug>,
5142) -> Result<StatementDesc, PlanError> {
5143    Ok(StatementDesc::new(None))
5144}
5145
5146pub fn plan_create_secret(
5147    scx: &StatementContext,
5148    stmt: CreateSecretStatement<Aug>,
5149) -> Result<Plan, PlanError> {
5150    let CreateSecretStatement {
5151        name,
5152        if_not_exists,
5153        value,
5154    } = &stmt;
5155
5156    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5157    let mut create_sql_statement = stmt.clone();
5158    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5159    let create_sql =
5160        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5161    let secret_as = query::plan_secret_as(scx, value.clone())?;
5162
5163    let secret = Secret {
5164        create_sql,
5165        secret_as,
5166    };
5167
5168    Ok(Plan::CreateSecret(CreateSecretPlan {
5169        name,
5170        secret,
5171        if_not_exists: *if_not_exists,
5172    }))
5173}
5174
5175pub fn describe_create_connection(
5176    _: &StatementContext,
5177    _: CreateConnectionStatement<Aug>,
5178) -> Result<StatementDesc, PlanError> {
5179    Ok(StatementDesc::new(None))
5180}
5181
5182generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5183
5184pub fn plan_create_connection(
5185    scx: &StatementContext,
5186    mut stmt: CreateConnectionStatement<Aug>,
5187) -> Result<Plan, PlanError> {
5188    let CreateConnectionStatement {
5189        name,
5190        connection_type,
5191        values,
5192        if_not_exists,
5193        with_options,
5194    } = stmt.clone();
5195    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5196    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5197    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5198
5199    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5200    if options.validate.is_some() {
5201        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5202    }
5203    let validate = match options.validate {
5204        Some(val) => val,
5205        None => {
5206            scx.catalog
5207                .system_vars()
5208                .enable_default_connection_validation()
5209                && details.to_connection().validate_by_default()
5210        }
5211    };
5212
5213    // Check for an object in the catalog with this same name
5214    let full_name = scx.catalog.resolve_full_name(&name);
5215    let partial_name = PartialItemName::from(full_name.clone());
5216    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5217        return Err(PlanError::ItemAlreadyExists {
5218            name: full_name.to_string(),
5219            item_type: item.item_type(),
5220        });
5221    }
5222
5223    // For SSH connections, overwrite the public key options based on the
5224    // connection details, in case we generated new keys during planning.
5225    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5226        stmt.values.retain(|v| {
5227            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5228        });
5229        stmt.values.push(ConnectionOption {
5230            name: ConnectionOptionName::PublicKey1,
5231            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5232        });
5233        stmt.values.push(ConnectionOption {
5234            name: ConnectionOptionName::PublicKey2,
5235            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5236        });
5237    }
5238    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5239
5240    let plan = CreateConnectionPlan {
5241        name,
5242        if_not_exists,
5243        connection: crate::plan::Connection {
5244            create_sql,
5245            details,
5246        },
5247        validate,
5248    };
5249    Ok(Plan::CreateConnection(plan))
5250}
5251
5252fn plan_drop_database(
5253    scx: &StatementContext,
5254    if_exists: bool,
5255    name: &UnresolvedDatabaseName,
5256    cascade: bool,
5257) -> Result<Option<DatabaseId>, PlanError> {
5258    Ok(match resolve_database(scx, name, if_exists)? {
5259        Some(database) => {
5260            if !cascade && database.has_schemas() {
5261                sql_bail!(
5262                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5263                    name,
5264                );
5265            }
5266            Some(database.id())
5267        }
5268        None => None,
5269    })
5270}
5271
5272pub fn describe_drop_objects(
5273    _: &StatementContext,
5274    _: DropObjectsStatement,
5275) -> Result<StatementDesc, PlanError> {
5276    Ok(StatementDesc::new(None))
5277}
5278
5279pub fn plan_drop_objects(
5280    scx: &mut StatementContext,
5281    DropObjectsStatement {
5282        object_type,
5283        if_exists,
5284        names,
5285        cascade,
5286    }: DropObjectsStatement,
5287) -> Result<Plan, PlanError> {
5288    assert_ne!(
5289        object_type,
5290        mz_sql_parser::ast::ObjectType::Func,
5291        "rejected in parser"
5292    );
5293    let object_type = object_type.into();
5294
5295    let mut referenced_ids = Vec::new();
5296    for name in names {
5297        let id = match &name {
5298            UnresolvedObjectName::Cluster(name) => {
5299                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5300            }
5301            UnresolvedObjectName::ClusterReplica(name) => {
5302                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5303            }
5304            UnresolvedObjectName::Database(name) => {
5305                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5306            }
5307            UnresolvedObjectName::Schema(name) => {
5308                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5309            }
5310            UnresolvedObjectName::Role(name) => {
5311                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5312            }
5313            UnresolvedObjectName::Item(name) => {
5314                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5315                    .map(ObjectId::Item)
5316            }
5317            UnresolvedObjectName::NetworkPolicy(name) => {
5318                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5319            }
5320        };
5321        match id {
5322            Some(id) => referenced_ids.push(id),
5323            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5324                name: name.to_ast_string_simple(),
5325                object_type,
5326            }),
5327        }
5328    }
5329    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5330
5331    Ok(Plan::DropObjects(DropObjectsPlan {
5332        referenced_ids,
5333        drop_ids,
5334        object_type,
5335    }))
5336}
5337
5338fn plan_drop_schema(
5339    scx: &StatementContext,
5340    if_exists: bool,
5341    name: &UnresolvedSchemaName,
5342    cascade: bool,
5343) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5344    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5345        Some((database_spec, schema_spec)) => {
5346            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5347                sql_bail!(
5348                    "cannot drop schema {name} because it is required by the database system",
5349                );
5350            }
5351            if let SchemaSpecifier::Temporary = schema_spec {
5352                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5353            }
5354            let schema = scx.get_schema(&database_spec, &schema_spec);
5355            if !cascade && schema.has_items() {
5356                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5357                sql_bail!(
5358                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5359                    full_schema_name
5360                );
5361            }
5362            Some((database_spec, schema_spec))
5363        }
5364        None => None,
5365    })
5366}
5367
5368fn plan_drop_role(
5369    scx: &StatementContext,
5370    if_exists: bool,
5371    name: &Ident,
5372) -> Result<Option<RoleId>, PlanError> {
5373    match scx.catalog.resolve_role(name.as_str()) {
5374        Ok(role) => {
5375            let id = role.id();
5376            if &id == scx.catalog.active_role_id() {
5377                sql_bail!("current role cannot be dropped");
5378            }
5379            for role in scx.catalog.get_roles() {
5380                for (member_id, grantor_id) in role.membership() {
5381                    if &id == grantor_id {
5382                        let member_role = scx.catalog.get_role(member_id);
5383                        sql_bail!(
5384                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5385                            name.as_str(),
5386                            role.name(),
5387                            member_role.name()
5388                        );
5389                    }
5390                }
5391            }
5392            Ok(Some(role.id()))
5393        }
5394        Err(_) if if_exists => Ok(None),
5395        Err(e) => Err(e.into()),
5396    }
5397}
5398
5399fn plan_drop_cluster(
5400    scx: &StatementContext,
5401    if_exists: bool,
5402    name: &Ident,
5403    cascade: bool,
5404) -> Result<Option<ClusterId>, PlanError> {
5405    Ok(match resolve_cluster(scx, name, if_exists)? {
5406        Some(cluster) => {
5407            if !cascade && !cluster.bound_objects().is_empty() {
5408                return Err(PlanError::DependentObjectsStillExist {
5409                    object_type: "cluster".to_string(),
5410                    object_name: cluster.name().to_string(),
5411                    dependents: Vec::new(),
5412                });
5413            }
5414            Some(cluster.id())
5415        }
5416        None => None,
5417    })
5418}
5419
5420fn plan_drop_network_policy(
5421    scx: &StatementContext,
5422    if_exists: bool,
5423    name: &Ident,
5424) -> Result<Option<NetworkPolicyId>, PlanError> {
5425    match scx.catalog.resolve_network_policy(name.as_str()) {
5426        Ok(policy) => {
5427            // TODO(network_policy): When we support role based network policies, check if any role
5428            // currently has the specified policy set.
5429            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5430                Err(PlanError::NetworkPolicyInUse)
5431            } else {
5432                Ok(Some(policy.id()))
5433            }
5434        }
5435        Err(_) if if_exists => Ok(None),
5436        Err(e) => Err(e.into()),
5437    }
5438}
5439
5440/// Returns `true` if the cluster has any object that requires a single replica.
5441/// Returns `false` if the cluster has no objects.
5442fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5443    // If this feature is enabled then all objects support multiple-replicas
5444    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5445        false
5446    } else {
5447        // Othewise we check for the existence of sources or sinks
5448        cluster.bound_objects().iter().any(|id| {
5449            let item = scx.catalog.get_item(id);
5450            matches!(
5451                item.item_type(),
5452                CatalogItemType::Sink | CatalogItemType::Source
5453            )
5454        })
5455    }
5456}
5457
5458fn plan_drop_cluster_replica(
5459    scx: &StatementContext,
5460    if_exists: bool,
5461    name: &QualifiedReplica,
5462) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5463    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5464    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5465}
5466
5467/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5468fn plan_drop_item(
5469    scx: &StatementContext,
5470    object_type: ObjectType,
5471    if_exists: bool,
5472    name: UnresolvedItemName,
5473    cascade: bool,
5474) -> Result<Option<CatalogItemId>, PlanError> {
5475    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5476        Ok(r) => r,
5477        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5478        Err(PlanError::MismatchedObjectType {
5479            name,
5480            is_type: ObjectType::MaterializedView,
5481            expected_type: ObjectType::View,
5482        }) => {
5483            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5484        }
5485        e => e?,
5486    };
5487
5488    Ok(match resolved {
5489        Some(catalog_item) => {
5490            if catalog_item.id().is_system() {
5491                sql_bail!(
5492                    "cannot drop {} {} because it is required by the database system",
5493                    catalog_item.item_type(),
5494                    scx.catalog.minimal_qualification(catalog_item.name()),
5495                );
5496            }
5497
5498            if !cascade {
5499                for id in catalog_item.used_by() {
5500                    let dep = scx.catalog.get_item(id);
5501                    if dependency_prevents_drop(object_type, dep) {
5502                        return Err(PlanError::DependentObjectsStillExist {
5503                            object_type: catalog_item.item_type().to_string(),
5504                            object_name: scx
5505                                .catalog
5506                                .minimal_qualification(catalog_item.name())
5507                                .to_string(),
5508                            dependents: vec![(
5509                                dep.item_type().to_string(),
5510                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5511                            )],
5512                        });
5513                    }
5514                }
5515                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5516                //  relies on entry. Unfortunately, we don't have that information readily available.
5517            }
5518            Some(catalog_item.id())
5519        }
5520        None => None,
5521    })
5522}
5523
5524/// Does the dependency `dep` prevent a drop of a non-cascade query?
5525fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5526    match object_type {
5527        ObjectType::Type => true,
5528        _ => match dep.item_type() {
5529            CatalogItemType::Func
5530            | CatalogItemType::Table
5531            | CatalogItemType::Source
5532            | CatalogItemType::View
5533            | CatalogItemType::MaterializedView
5534            | CatalogItemType::Sink
5535            | CatalogItemType::Type
5536            | CatalogItemType::Secret
5537            | CatalogItemType::Connection
5538            | CatalogItemType::ContinualTask => true,
5539            CatalogItemType::Index => false,
5540        },
5541    }
5542}
5543
5544pub fn describe_alter_index_options(
5545    _: &StatementContext,
5546    _: AlterIndexStatement<Aug>,
5547) -> Result<StatementDesc, PlanError> {
5548    Ok(StatementDesc::new(None))
5549}
5550
5551pub fn describe_drop_owned(
5552    _: &StatementContext,
5553    _: DropOwnedStatement<Aug>,
5554) -> Result<StatementDesc, PlanError> {
5555    Ok(StatementDesc::new(None))
5556}
5557
5558pub fn plan_drop_owned(
5559    scx: &StatementContext,
5560    drop: DropOwnedStatement<Aug>,
5561) -> Result<Plan, PlanError> {
5562    let cascade = drop.cascade();
5563    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5564    let mut drop_ids = Vec::new();
5565    let mut privilege_revokes = Vec::new();
5566    let mut default_privilege_revokes = Vec::new();
5567
5568    fn update_privilege_revokes(
5569        object_id: SystemObjectId,
5570        privileges: &PrivilegeMap,
5571        role_ids: &BTreeSet<RoleId>,
5572        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5573    ) {
5574        privilege_revokes.extend(iter::zip(
5575            iter::repeat(object_id),
5576            privileges
5577                .all_values()
5578                .filter(|privilege| role_ids.contains(&privilege.grantee))
5579                .cloned(),
5580        ));
5581    }
5582
5583    // Replicas
5584    for replica in scx.catalog.get_cluster_replicas() {
5585        if role_ids.contains(&replica.owner_id()) {
5586            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5587        }
5588    }
5589
5590    // Clusters
5591    for cluster in scx.catalog.get_clusters() {
5592        if role_ids.contains(&cluster.owner_id()) {
5593            // Note: CASCADE is not required for replicas.
5594            if !cascade {
5595                let non_owned_bound_objects: Vec<_> = cluster
5596                    .bound_objects()
5597                    .into_iter()
5598                    .map(|item_id| scx.catalog.get_item(item_id))
5599                    .filter(|item| !role_ids.contains(&item.owner_id()))
5600                    .collect();
5601                if !non_owned_bound_objects.is_empty() {
5602                    let names: Vec<_> = non_owned_bound_objects
5603                        .into_iter()
5604                        .map(|item| {
5605                            (
5606                                item.item_type().to_string(),
5607                                scx.catalog.resolve_full_name(item.name()).to_string(),
5608                            )
5609                        })
5610                        .collect();
5611                    return Err(PlanError::DependentObjectsStillExist {
5612                        object_type: "cluster".to_string(),
5613                        object_name: cluster.name().to_string(),
5614                        dependents: names,
5615                    });
5616                }
5617            }
5618            drop_ids.push(cluster.id().into());
5619        }
5620        update_privilege_revokes(
5621            SystemObjectId::Object(cluster.id().into()),
5622            cluster.privileges(),
5623            &role_ids,
5624            &mut privilege_revokes,
5625        );
5626    }
5627
5628    // Items
5629    for item in scx.catalog.get_items() {
5630        if role_ids.contains(&item.owner_id()) {
5631            if !cascade {
5632                // Checks if any items still depend on this one, returning an error if so.
5633                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5634                    let non_owned_dependencies: Vec<_> = used_by
5635                        .into_iter()
5636                        .map(|item_id| scx.catalog.get_item(item_id))
5637                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5638                        .filter(|item| !role_ids.contains(&item.owner_id()))
5639                        .collect();
5640                    if !non_owned_dependencies.is_empty() {
5641                        let names: Vec<_> = non_owned_dependencies
5642                            .into_iter()
5643                            .map(|item| {
5644                                let item_typ = item.item_type().to_string();
5645                                let item_name =
5646                                    scx.catalog.resolve_full_name(item.name()).to_string();
5647                                (item_typ, item_name)
5648                            })
5649                            .collect();
5650                        Err(PlanError::DependentObjectsStillExist {
5651                            object_type: item.item_type().to_string(),
5652                            object_name: scx
5653                                .catalog
5654                                .resolve_full_name(item.name())
5655                                .to_string()
5656                                .to_string(),
5657                            dependents: names,
5658                        })
5659                    } else {
5660                        Ok(())
5661                    }
5662                };
5663
5664                // When this item gets dropped it will also drop its progress source, so we need to
5665                // check the users of those.
5666                if let Some(id) = item.progress_id() {
5667                    let progress_item = scx.catalog.get_item(&id);
5668                    check_if_dependents_exist(progress_item.used_by())?;
5669                }
5670                check_if_dependents_exist(item.used_by())?;
5671            }
5672            drop_ids.push(item.id().into());
5673        }
5674        update_privilege_revokes(
5675            SystemObjectId::Object(item.id().into()),
5676            item.privileges(),
5677            &role_ids,
5678            &mut privilege_revokes,
5679        );
5680    }
5681
5682    // Schemas
5683    for schema in scx.catalog.get_schemas() {
5684        if !schema.id().is_temporary() {
5685            if role_ids.contains(&schema.owner_id()) {
5686                if !cascade {
5687                    let non_owned_dependencies: Vec<_> = schema
5688                        .item_ids()
5689                        .map(|item_id| scx.catalog.get_item(&item_id))
5690                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5691                        .filter(|item| !role_ids.contains(&item.owner_id()))
5692                        .collect();
5693                    if !non_owned_dependencies.is_empty() {
5694                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5695                        sql_bail!(
5696                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5697                            full_schema_name.to_string().quoted()
5698                        );
5699                    }
5700                }
5701                drop_ids.push((*schema.database(), *schema.id()).into())
5702            }
5703            update_privilege_revokes(
5704                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5705                schema.privileges(),
5706                &role_ids,
5707                &mut privilege_revokes,
5708            );
5709        }
5710    }
5711
5712    // Databases
5713    for database in scx.catalog.get_databases() {
5714        if role_ids.contains(&database.owner_id()) {
5715            if !cascade {
5716                let non_owned_schemas: Vec<_> = database
5717                    .schemas()
5718                    .into_iter()
5719                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5720                    .collect();
5721                if !non_owned_schemas.is_empty() {
5722                    sql_bail!(
5723                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5724                        database.name().quoted(),
5725                    );
5726                }
5727            }
5728            drop_ids.push(database.id().into());
5729        }
5730        update_privilege_revokes(
5731            SystemObjectId::Object(database.id().into()),
5732            database.privileges(),
5733            &role_ids,
5734            &mut privilege_revokes,
5735        );
5736    }
5737
5738    // System
5739    update_privilege_revokes(
5740        SystemObjectId::System,
5741        scx.catalog.get_system_privileges(),
5742        &role_ids,
5743        &mut privilege_revokes,
5744    );
5745
5746    for (default_privilege_object, default_privilege_acl_items) in
5747        scx.catalog.get_default_privileges()
5748    {
5749        for default_privilege_acl_item in default_privilege_acl_items {
5750            if role_ids.contains(&default_privilege_object.role_id)
5751                || role_ids.contains(&default_privilege_acl_item.grantee)
5752            {
5753                default_privilege_revokes.push((
5754                    default_privilege_object.clone(),
5755                    default_privilege_acl_item.clone(),
5756                ));
5757            }
5758        }
5759    }
5760
5761    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5762
5763    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5764    if !system_ids.is_empty() {
5765        let mut owners = system_ids
5766            .into_iter()
5767            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5768            .collect::<BTreeSet<_>>()
5769            .into_iter()
5770            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5771        sql_bail!(
5772            "cannot drop objects owned by role {} because they are required by the database system",
5773            owners.join(", "),
5774        );
5775    }
5776
5777    Ok(Plan::DropOwned(DropOwnedPlan {
5778        role_ids: role_ids.into_iter().collect(),
5779        drop_ids,
5780        privilege_revokes,
5781        default_privilege_revokes,
5782    }))
5783}
5784
5785fn plan_retain_history_option(
5786    scx: &StatementContext,
5787    retain_history: Option<OptionalDuration>,
5788) -> Result<Option<CompactionWindow>, PlanError> {
5789    if let Some(OptionalDuration(lcw)) = retain_history {
5790        Ok(Some(plan_retain_history(scx, lcw)?))
5791    } else {
5792        Ok(None)
5793    }
5794}
5795
5796// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5797// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5798// already converts the zero duration into `None`. This function must not be called in the `RESET
5799// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5800// `None`.
5801fn plan_retain_history(
5802    scx: &StatementContext,
5803    lcw: Option<Duration>,
5804) -> Result<CompactionWindow, PlanError> {
5805    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5806    match lcw {
5807        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5808        // disable compaction), and should never occur here. Furthermore, some things actually do
5809        // break when this is set to real zero:
5810        // https://github.com/MaterializeInc/database-issues/issues/3798.
5811        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5812            option_name: "RETAIN HISTORY".to_string(),
5813            err: Box::new(PlanError::Unstructured(
5814                "internal error: unexpectedly zero".to_string(),
5815            )),
5816        }),
5817        Some(duration) => {
5818            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5819            // should only be possible during testing).
5820            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5821                && scx
5822                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5823                    .is_err()
5824            {
5825                return Err(PlanError::RetainHistoryLow {
5826                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5827                });
5828            }
5829            Ok(duration.try_into()?)
5830        }
5831        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5832        // to be a bad choice, so prevent it.
5833        None => {
5834            if scx
5835                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5836                .is_err()
5837            {
5838                Err(PlanError::RetainHistoryRequired)
5839            } else {
5840                Ok(CompactionWindow::DisableCompaction)
5841            }
5842        }
5843    }
5844}
5845
5846generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5847
5848fn plan_index_options(
5849    scx: &StatementContext,
5850    with_opts: Vec<IndexOption<Aug>>,
5851) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5852    if !with_opts.is_empty() {
5853        // Index options are not durable.
5854        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5855    }
5856
5857    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5858
5859    let mut out = Vec::with_capacity(1);
5860    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5861        out.push(crate::plan::IndexOption::RetainHistory(cw));
5862    }
5863    Ok(out)
5864}
5865
5866generate_extracted_config!(
5867    TableOption,
5868    (PartitionBy, Vec<Ident>),
5869    (RetainHistory, OptionalDuration),
5870    (RedactedTest, String)
5871);
5872
5873fn plan_table_options(
5874    scx: &StatementContext,
5875    desc: &RelationDesc,
5876    with_opts: Vec<TableOption<Aug>>,
5877) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5878    let TableOptionExtracted {
5879        partition_by,
5880        retain_history,
5881        redacted_test,
5882        ..
5883    }: TableOptionExtracted = with_opts.try_into()?;
5884
5885    if let Some(partition_by) = partition_by {
5886        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5887        check_partition_by(desc, partition_by)?;
5888    }
5889
5890    if redacted_test.is_some() {
5891        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5892    }
5893
5894    let mut out = Vec::with_capacity(1);
5895    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5896        out.push(crate::plan::TableOption::RetainHistory(cw));
5897    }
5898    Ok(out)
5899}
5900
5901pub fn plan_alter_index_options(
5902    scx: &mut StatementContext,
5903    AlterIndexStatement {
5904        index_name,
5905        if_exists,
5906        action,
5907    }: AlterIndexStatement<Aug>,
5908) -> Result<Plan, PlanError> {
5909    let object_type = ObjectType::Index;
5910    match action {
5911        AlterIndexAction::ResetOptions(options) => {
5912            let mut options = options.into_iter();
5913            if let Some(opt) = options.next() {
5914                match opt {
5915                    IndexOptionName::RetainHistory => {
5916                        if options.next().is_some() {
5917                            sql_bail!("RETAIN HISTORY must be only option");
5918                        }
5919                        return alter_retain_history(
5920                            scx,
5921                            object_type,
5922                            if_exists,
5923                            UnresolvedObjectName::Item(index_name),
5924                            None,
5925                        );
5926                    }
5927                }
5928            }
5929            sql_bail!("expected option");
5930        }
5931        AlterIndexAction::SetOptions(options) => {
5932            let mut options = options.into_iter();
5933            if let Some(opt) = options.next() {
5934                match opt.name {
5935                    IndexOptionName::RetainHistory => {
5936                        if options.next().is_some() {
5937                            sql_bail!("RETAIN HISTORY must be only option");
5938                        }
5939                        return alter_retain_history(
5940                            scx,
5941                            object_type,
5942                            if_exists,
5943                            UnresolvedObjectName::Item(index_name),
5944                            opt.value,
5945                        );
5946                    }
5947                }
5948            }
5949            sql_bail!("expected option");
5950        }
5951    }
5952}
5953
5954pub fn describe_alter_cluster_set_options(
5955    _: &StatementContext,
5956    _: AlterClusterStatement<Aug>,
5957) -> Result<StatementDesc, PlanError> {
5958    Ok(StatementDesc::new(None))
5959}
5960
5961pub fn plan_alter_cluster(
5962    scx: &mut StatementContext,
5963    AlterClusterStatement {
5964        name,
5965        action,
5966        if_exists,
5967    }: AlterClusterStatement<Aug>,
5968) -> Result<Plan, PlanError> {
5969    let cluster = match resolve_cluster(scx, &name, if_exists)? {
5970        Some(entry) => entry,
5971        None => {
5972            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5973                name: name.to_ast_string_simple(),
5974                object_type: ObjectType::Cluster,
5975            });
5976
5977            return Ok(Plan::AlterNoop(AlterNoopPlan {
5978                object_type: ObjectType::Cluster,
5979            }));
5980        }
5981    };
5982
5983    let mut options: PlanClusterOption = Default::default();
5984    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5985
5986    match action {
5987        AlterClusterAction::SetOptions {
5988            options: set_options,
5989            with_options,
5990        } => {
5991            let ClusterOptionExtracted {
5992                availability_zones,
5993                introspection_debugging,
5994                introspection_interval,
5995                managed,
5996                replicas: replica_defs,
5997                replication_factor,
5998                seen: _,
5999                size,
6000                disk,
6001                schedule,
6002                workload_class,
6003            }: ClusterOptionExtracted = set_options.try_into()?;
6004
6005            if !scx.catalog.active_role_id().is_system() {
6006                if workload_class.is_some() {
6007                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6008                }
6009            }
6010
6011            match managed.unwrap_or_else(|| cluster.is_managed()) {
6012                true => {
6013                    let alter_strategy_extracted =
6014                        ClusterAlterOptionExtracted::try_from(with_options)?;
6015                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6016
6017                    match alter_strategy {
6018                        AlterClusterPlanStrategy::None => {}
6019                        _ => {
6020                            scx.require_feature_flag(
6021                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6022                            )?;
6023                        }
6024                    }
6025
6026                    if replica_defs.is_some() {
6027                        sql_bail!("REPLICAS not supported for managed clusters");
6028                    }
6029                    if schedule.is_some()
6030                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6031                    {
6032                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6033                    }
6034
6035                    if let Some(replication_factor) = replication_factor {
6036                        if schedule.is_some()
6037                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6038                        {
6039                            sql_bail!(
6040                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6041                            );
6042                        }
6043                        if let Some(current_schedule) = cluster.schedule() {
6044                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6045                                sql_bail!(
6046                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6047                                );
6048                            }
6049                        }
6050
6051                        let internal_replica_count =
6052                            cluster.replicas().iter().filter(|r| r.internal()).count();
6053                        let hypothetical_replica_count =
6054                            internal_replica_count + usize::cast_from(replication_factor);
6055
6056                        // Total number of replicas running is internal replicas
6057                        // + replication factor.
6058                        if contains_single_replica_objects(scx, cluster)
6059                            && hypothetical_replica_count > 1
6060                        {
6061                            return Err(PlanError::CreateReplicaFailStorageObjects {
6062                                current_replica_count: cluster.replica_ids().iter().count(),
6063                                internal_replica_count,
6064                                hypothetical_replica_count,
6065                            });
6066                        }
6067                    } else if alter_strategy.is_some() {
6068                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6069                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6070                        // just fail.
6071                        let internal_replica_count =
6072                            cluster.replicas().iter().filter(|r| r.internal()).count();
6073                        let hypothetical_replica_count = internal_replica_count * 2;
6074                        if contains_single_replica_objects(scx, cluster) {
6075                            return Err(PlanError::CreateReplicaFailStorageObjects {
6076                                current_replica_count: cluster.replica_ids().iter().count(),
6077                                internal_replica_count,
6078                                hypothetical_replica_count,
6079                            });
6080                        }
6081                    }
6082                }
6083                false => {
6084                    if !alter_strategy.is_none() {
6085                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6086                    }
6087                    if availability_zones.is_some() {
6088                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6089                    }
6090                    if replication_factor.is_some() {
6091                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6092                    }
6093                    if introspection_debugging.is_some() {
6094                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6095                    }
6096                    if introspection_interval.is_some() {
6097                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6098                    }
6099                    if size.is_some() {
6100                        sql_bail!("SIZE not supported for unmanaged clusters");
6101                    }
6102                    if disk.is_some() {
6103                        sql_bail!("DISK not supported for unmanaged clusters");
6104                    }
6105                    if schedule.is_some()
6106                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6107                    {
6108                        sql_bail!(
6109                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6110                        );
6111                    }
6112                    if let Some(current_schedule) = cluster.schedule() {
6113                        if !matches!(current_schedule, ClusterSchedule::Manual)
6114                            && schedule.is_none()
6115                        {
6116                            sql_bail!(
6117                                "when switching a cluster to unmanaged, if the managed \
6118                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6119                                explicitly set the SCHEDULE to MANUAL"
6120                            );
6121                        }
6122                    }
6123                }
6124            }
6125
6126            let mut replicas = vec![];
6127            for ReplicaDefinition { name, options } in
6128                replica_defs.into_iter().flat_map(Vec::into_iter)
6129            {
6130                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6131            }
6132
6133            if let Some(managed) = managed {
6134                options.managed = AlterOptionParameter::Set(managed);
6135            }
6136            if let Some(replication_factor) = replication_factor {
6137                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6138            }
6139            if let Some(size) = &size {
6140                // HACK(benesch): disk is always enabled for v2 cluster sizes,
6141                // and it is an error to specify `DISK = FALSE` or `DISK = TRUE`
6142                // explicitly.
6143                //
6144                // The long term plan is to phase out the v1 cluster sizes, at
6145                // which point we'll be able to remove the `DISK` option
6146                // entirely and simply always enable disk.
6147                if scx.catalog.is_cluster_size_cc(size) {
6148                    if disk.is_some() {
6149                        sql_bail!(
6150                            "DISK option not supported for modern cluster sizes because disk is always enabled"
6151                        );
6152                    } else {
6153                        options.disk = AlterOptionParameter::Set(true);
6154                    }
6155                }
6156                options.size = AlterOptionParameter::Set(size.clone());
6157            }
6158            if let Some(availability_zones) = availability_zones {
6159                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6160            }
6161            if let Some(introspection_debugging) = introspection_debugging {
6162                options.introspection_debugging =
6163                    AlterOptionParameter::Set(introspection_debugging);
6164            }
6165            if let Some(introspection_interval) = introspection_interval {
6166                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6167            }
6168            if let Some(disk) = disk {
6169                // HACK(benesch): disk is always enabled for v2 cluster sizes,
6170                // and it is an error to specify `DISK = FALSE` or `DISK = TRUE`
6171                // explicitly.
6172                //
6173                // The long term plan is to phase out the v1 cluster sizes, at
6174                // which point we'll be able to remove the `DISK` option
6175                // entirely and simply always enable disk.
6176                let size = size.as_deref().unwrap_or_else(|| {
6177                    cluster.managed_size().expect("cluster known to be managed")
6178                });
6179                if scx.catalog.is_cluster_size_cc(size) {
6180                    sql_bail!(
6181                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6182                    );
6183                }
6184
6185                if disk {
6186                    scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
6187                }
6188                options.disk = AlterOptionParameter::Set(disk);
6189            }
6190            if !replicas.is_empty() {
6191                options.replicas = AlterOptionParameter::Set(replicas);
6192            }
6193            if let Some(schedule) = schedule {
6194                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6195            }
6196            if let Some(workload_class) = workload_class {
6197                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6198            }
6199        }
6200        AlterClusterAction::ResetOptions(reset_options) => {
6201            use AlterOptionParameter::Reset;
6202            use ClusterOptionName::*;
6203
6204            if !scx.catalog.active_role_id().is_system() {
6205                if reset_options.contains(&WorkloadClass) {
6206                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6207                }
6208            }
6209
6210            for option in reset_options {
6211                match option {
6212                    AvailabilityZones => options.availability_zones = Reset,
6213                    Disk => options.disk = Reset,
6214                    IntrospectionInterval => options.introspection_interval = Reset,
6215                    IntrospectionDebugging => options.introspection_debugging = Reset,
6216                    Managed => options.managed = Reset,
6217                    Replicas => options.replicas = Reset,
6218                    ReplicationFactor => options.replication_factor = Reset,
6219                    Size => options.size = Reset,
6220                    Schedule => options.schedule = Reset,
6221                    WorkloadClass => options.workload_class = Reset,
6222                }
6223            }
6224        }
6225    }
6226    Ok(Plan::AlterCluster(AlterClusterPlan {
6227        id: cluster.id(),
6228        name: cluster.name().to_string(),
6229        options,
6230        strategy: alter_strategy,
6231    }))
6232}
6233
6234pub fn describe_alter_set_cluster(
6235    _: &StatementContext,
6236    _: AlterSetClusterStatement<Aug>,
6237) -> Result<StatementDesc, PlanError> {
6238    Ok(StatementDesc::new(None))
6239}
6240
6241pub fn plan_alter_item_set_cluster(
6242    scx: &StatementContext,
6243    AlterSetClusterStatement {
6244        if_exists,
6245        set_cluster: in_cluster_name,
6246        name,
6247        object_type,
6248    }: AlterSetClusterStatement<Aug>,
6249) -> Result<Plan, PlanError> {
6250    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6251
6252    let object_type = object_type.into();
6253
6254    // Prevent access to `SET CLUSTER` for unsupported objects.
6255    match object_type {
6256        ObjectType::MaterializedView => {}
6257        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6258            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6259        }
6260        _ => {
6261            bail_never_supported!(
6262                format!("ALTER {object_type} SET CLUSTER"),
6263                "sql/alter-set-cluster/",
6264                format!("{object_type} has no associated cluster")
6265            )
6266        }
6267    }
6268
6269    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6270
6271    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6272        Some(entry) => {
6273            let current_cluster = entry.cluster_id();
6274            let Some(current_cluster) = current_cluster else {
6275                sql_bail!("No cluster associated with {name}");
6276            };
6277
6278            if current_cluster == in_cluster.id() {
6279                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6280            } else {
6281                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6282                    id: entry.id(),
6283                    set_cluster: in_cluster.id(),
6284                }))
6285            }
6286        }
6287        None => {
6288            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6289                name: name.to_ast_string_simple(),
6290                object_type,
6291            });
6292
6293            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6294        }
6295    }
6296}
6297
6298pub fn describe_alter_object_rename(
6299    _: &StatementContext,
6300    _: AlterObjectRenameStatement,
6301) -> Result<StatementDesc, PlanError> {
6302    Ok(StatementDesc::new(None))
6303}
6304
6305pub fn plan_alter_object_rename(
6306    scx: &mut StatementContext,
6307    AlterObjectRenameStatement {
6308        name,
6309        object_type,
6310        to_item_name,
6311        if_exists,
6312    }: AlterObjectRenameStatement,
6313) -> Result<Plan, PlanError> {
6314    let object_type = object_type.into();
6315    match (object_type, name) {
6316        (
6317            ObjectType::View
6318            | ObjectType::MaterializedView
6319            | ObjectType::Table
6320            | ObjectType::Source
6321            | ObjectType::Index
6322            | ObjectType::Sink
6323            | ObjectType::Secret
6324            | ObjectType::Connection,
6325            UnresolvedObjectName::Item(name),
6326        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6327        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6328            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6329        }
6330        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6331            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6332        }
6333        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6334            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6335        }
6336        (object_type, name) => {
6337            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6338        }
6339    }
6340}
6341
6342pub fn plan_alter_schema_rename(
6343    scx: &mut StatementContext,
6344    name: UnresolvedSchemaName,
6345    to_schema_name: Ident,
6346    if_exists: bool,
6347) -> Result<Plan, PlanError> {
6348    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6349        let object_type = ObjectType::Schema;
6350        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6351            name: name.to_ast_string_simple(),
6352            object_type,
6353        });
6354        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6355    };
6356
6357    // Make sure the name is unique.
6358    if scx
6359        .resolve_schema_in_database(&db_spec, &to_schema_name)
6360        .is_ok()
6361    {
6362        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6363            to_schema_name.clone().into_string(),
6364        )));
6365    }
6366
6367    // Prevent users from renaming system related schemas.
6368    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6369    if schema.id().is_system() {
6370        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6371    }
6372
6373    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6374        cur_schema_spec: (db_spec, schema_spec),
6375        new_schema_name: to_schema_name.into_string(),
6376    }))
6377}
6378
6379pub fn plan_alter_schema_swap<F>(
6380    scx: &mut StatementContext,
6381    name_a: UnresolvedSchemaName,
6382    name_b: Ident,
6383    gen_temp_suffix: F,
6384) -> Result<Plan, PlanError>
6385where
6386    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6387{
6388    let schema_a = scx.resolve_schema(name_a.clone())?;
6389
6390    let db_spec = schema_a.database().clone();
6391    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6392        sql_bail!("cannot swap schemas that are in the ambient database");
6393    };
6394    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6395
6396    // We cannot swap system schemas.
6397    if schema_a.id().is_system() || schema_b.id().is_system() {
6398        bail_never_supported!("swapping a system schema".to_string())
6399    }
6400
6401    // Generate a temporary name we can swap schema_a to.
6402    //
6403    // 'check' returns if the temp schema name would be valid.
6404    let check = |temp_suffix: &str| {
6405        let mut temp_name = ident!("mz_schema_swap_");
6406        temp_name.append_lossy(temp_suffix);
6407        scx.resolve_schema_in_database(&db_spec, &temp_name)
6408            .is_err()
6409    };
6410    let temp_suffix = gen_temp_suffix(&check)?;
6411    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6412
6413    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6414        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6415        schema_a_name: schema_a.name().schema.to_string(),
6416        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6417        schema_b_name: schema_b.name().schema.to_string(),
6418        name_temp,
6419    }))
6420}
6421
6422pub fn plan_alter_item_rename(
6423    scx: &mut StatementContext,
6424    object_type: ObjectType,
6425    name: UnresolvedItemName,
6426    to_item_name: Ident,
6427    if_exists: bool,
6428) -> Result<Plan, PlanError> {
6429    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6430        Ok(r) => r,
6431        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6432        Err(PlanError::MismatchedObjectType {
6433            name,
6434            is_type: ObjectType::MaterializedView,
6435            expected_type: ObjectType::View,
6436        }) => {
6437            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6438        }
6439        e => e?,
6440    };
6441
6442    match resolved {
6443        Some(entry) => {
6444            let full_name = scx.catalog.resolve_full_name(entry.name());
6445            let item_type = entry.item_type();
6446
6447            let proposed_name = QualifiedItemName {
6448                qualifiers: entry.name().qualifiers.clone(),
6449                item: to_item_name.clone().into_string(),
6450            };
6451
6452            // For PostgreSQL compatibility, items and types cannot have
6453            // overlapping names in a variety of situations. See the comment on
6454            // `CatalogItemType::conflicts_with_type` for details.
6455            let conflicting_type_exists;
6456            let conflicting_item_exists;
6457            if item_type == CatalogItemType::Type {
6458                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6459                conflicting_item_exists = scx
6460                    .catalog
6461                    .get_item_by_name(&proposed_name)
6462                    .map(|item| item.item_type().conflicts_with_type())
6463                    .unwrap_or(false);
6464            } else {
6465                conflicting_type_exists = item_type.conflicts_with_type()
6466                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6467                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6468            };
6469            if conflicting_type_exists || conflicting_item_exists {
6470                sql_bail!("catalog item '{}' already exists", to_item_name);
6471            }
6472
6473            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6474                id: entry.id(),
6475                current_full_name: full_name,
6476                to_name: normalize::ident(to_item_name),
6477                object_type,
6478            }))
6479        }
6480        None => {
6481            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6482                name: name.to_ast_string_simple(),
6483                object_type,
6484            });
6485
6486            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6487        }
6488    }
6489}
6490
6491pub fn plan_alter_cluster_rename(
6492    scx: &mut StatementContext,
6493    object_type: ObjectType,
6494    name: Ident,
6495    to_name: Ident,
6496    if_exists: bool,
6497) -> Result<Plan, PlanError> {
6498    match resolve_cluster(scx, &name, if_exists)? {
6499        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6500            id: entry.id(),
6501            name: entry.name().to_string(),
6502            to_name: ident(to_name),
6503        })),
6504        None => {
6505            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6506                name: name.to_ast_string_simple(),
6507                object_type,
6508            });
6509
6510            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6511        }
6512    }
6513}
6514
6515pub fn plan_alter_cluster_swap<F>(
6516    scx: &mut StatementContext,
6517    name_a: Ident,
6518    name_b: Ident,
6519    gen_temp_suffix: F,
6520) -> Result<Plan, PlanError>
6521where
6522    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6523{
6524    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6525    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6526
6527    let check = |temp_suffix: &str| {
6528        let mut temp_name = ident!("mz_schema_swap_");
6529        temp_name.append_lossy(temp_suffix);
6530        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6531            // Temp name does not exist, so we can use it.
6532            Err(CatalogError::UnknownCluster(_)) => true,
6533            // Temp name already exists!
6534            Ok(_) | Err(_) => false,
6535        }
6536    };
6537    let temp_suffix = gen_temp_suffix(&check)?;
6538    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6539
6540    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6541        id_a: cluster_a.id(),
6542        id_b: cluster_b.id(),
6543        name_a: name_a.into_string(),
6544        name_b: name_b.into_string(),
6545        name_temp,
6546    }))
6547}
6548
6549pub fn plan_alter_cluster_replica_rename(
6550    scx: &mut StatementContext,
6551    object_type: ObjectType,
6552    name: QualifiedReplica,
6553    to_item_name: Ident,
6554    if_exists: bool,
6555) -> Result<Plan, PlanError> {
6556    match resolve_cluster_replica(scx, &name, if_exists)? {
6557        Some((cluster, replica)) => {
6558            ensure_cluster_is_not_managed(scx, cluster.id())?;
6559            Ok(Plan::AlterClusterReplicaRename(
6560                AlterClusterReplicaRenamePlan {
6561                    cluster_id: cluster.id(),
6562                    replica_id: replica,
6563                    name: QualifiedReplica {
6564                        cluster: Ident::new(cluster.name())?,
6565                        replica: name.replica,
6566                    },
6567                    to_name: normalize::ident(to_item_name),
6568                },
6569            ))
6570        }
6571        None => {
6572            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6573                name: name.to_ast_string_simple(),
6574                object_type,
6575            });
6576
6577            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6578        }
6579    }
6580}
6581
6582pub fn describe_alter_object_swap(
6583    _: &StatementContext,
6584    _: AlterObjectSwapStatement,
6585) -> Result<StatementDesc, PlanError> {
6586    Ok(StatementDesc::new(None))
6587}
6588
6589pub fn plan_alter_object_swap(
6590    scx: &mut StatementContext,
6591    stmt: AlterObjectSwapStatement,
6592) -> Result<Plan, PlanError> {
6593    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6594
6595    let AlterObjectSwapStatement {
6596        object_type,
6597        name_a,
6598        name_b,
6599    } = stmt;
6600    let object_type = object_type.into();
6601
6602    // We'll try 10 times to generate a temporary suffix.
6603    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6604        let mut attempts = 0;
6605        let name_temp = loop {
6606            attempts += 1;
6607            if attempts > 10 {
6608                tracing::warn!("Unable to generate temp id for swapping");
6609                sql_bail!("unable to swap!");
6610            }
6611
6612            // Call the provided closure to make sure this name is unique!
6613            let short_id = mz_ore::id_gen::temp_id();
6614            if check_fn(&short_id) {
6615                break short_id;
6616            }
6617        };
6618
6619        Ok(name_temp)
6620    };
6621
6622    match (object_type, name_a, name_b) {
6623        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6624            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6625        }
6626        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6627            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6628        }
6629        (object_type, _, _) => Err(PlanError::Unsupported {
6630            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6631            discussion_no: None,
6632        }),
6633    }
6634}
6635
6636pub fn describe_alter_retain_history(
6637    _: &StatementContext,
6638    _: AlterRetainHistoryStatement<Aug>,
6639) -> Result<StatementDesc, PlanError> {
6640    Ok(StatementDesc::new(None))
6641}
6642
6643pub fn plan_alter_retain_history(
6644    scx: &StatementContext,
6645    AlterRetainHistoryStatement {
6646        object_type,
6647        if_exists,
6648        name,
6649        history,
6650    }: AlterRetainHistoryStatement<Aug>,
6651) -> Result<Plan, PlanError> {
6652    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6653}
6654
6655fn alter_retain_history(
6656    scx: &StatementContext,
6657    object_type: ObjectType,
6658    if_exists: bool,
6659    name: UnresolvedObjectName,
6660    history: Option<WithOptionValue<Aug>>,
6661) -> Result<Plan, PlanError> {
6662    let name = match (object_type, name) {
6663        (
6664            // View gets a special error below.
6665            ObjectType::View
6666            | ObjectType::MaterializedView
6667            | ObjectType::Table
6668            | ObjectType::Source
6669            | ObjectType::Index,
6670            UnresolvedObjectName::Item(name),
6671        ) => name,
6672        (object_type, _) => {
6673            sql_bail!("{object_type} does not support RETAIN HISTORY")
6674        }
6675    };
6676    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6677        Some(entry) => {
6678            let full_name = scx.catalog.resolve_full_name(entry.name());
6679            let item_type = entry.item_type();
6680
6681            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6682            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6683                return Err(PlanError::AlterViewOnMaterializedView(
6684                    full_name.to_string(),
6685                ));
6686            } else if object_type == ObjectType::View {
6687                sql_bail!("{object_type} does not support RETAIN HISTORY")
6688            } else if object_type != item_type {
6689                sql_bail!(
6690                    "\"{}\" is a {} not a {}",
6691                    full_name,
6692                    entry.item_type(),
6693                    format!("{object_type}").to_lowercase()
6694                )
6695            }
6696
6697            // Save the original value so we can write it back down in the create_sql catalog item.
6698            let (value, lcw) = match &history {
6699                Some(WithOptionValue::RetainHistoryFor(value)) => {
6700                    let window = OptionalDuration::try_from_value(value.clone())?;
6701                    (Some(value.clone()), window.0)
6702                }
6703                // None is RESET, so use the default CW.
6704                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6705                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6706            };
6707            let window = plan_retain_history(scx, lcw)?;
6708
6709            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6710                id: entry.id(),
6711                value,
6712                window,
6713                object_type,
6714            }))
6715        }
6716        None => {
6717            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6718                name: name.to_ast_string_simple(),
6719                object_type,
6720            });
6721
6722            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6723        }
6724    }
6725}
6726
6727pub fn describe_alter_secret_options(
6728    _: &StatementContext,
6729    _: AlterSecretStatement<Aug>,
6730) -> Result<StatementDesc, PlanError> {
6731    Ok(StatementDesc::new(None))
6732}
6733
6734pub fn plan_alter_secret(
6735    scx: &mut StatementContext,
6736    stmt: AlterSecretStatement<Aug>,
6737) -> Result<Plan, PlanError> {
6738    let AlterSecretStatement {
6739        name,
6740        if_exists,
6741        value,
6742    } = stmt;
6743    let object_type = ObjectType::Secret;
6744    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6745        Some(entry) => entry.id(),
6746        None => {
6747            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6748                name: name.to_string(),
6749                object_type,
6750            });
6751
6752            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6753        }
6754    };
6755
6756    let secret_as = query::plan_secret_as(scx, value)?;
6757
6758    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6759}
6760
6761pub fn describe_alter_connection(
6762    _: &StatementContext,
6763    _: AlterConnectionStatement<Aug>,
6764) -> Result<StatementDesc, PlanError> {
6765    Ok(StatementDesc::new(None))
6766}
6767
6768generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6769
6770pub fn plan_alter_connection(
6771    scx: &StatementContext,
6772    stmt: AlterConnectionStatement<Aug>,
6773) -> Result<Plan, PlanError> {
6774    let AlterConnectionStatement {
6775        name,
6776        if_exists,
6777        actions,
6778        with_options,
6779    } = stmt;
6780    let conn_name = normalize::unresolved_item_name(name)?;
6781    let entry = match scx.catalog.resolve_item(&conn_name) {
6782        Ok(entry) => entry,
6783        Err(_) if if_exists => {
6784            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6785                name: conn_name.to_string(),
6786                object_type: ObjectType::Sink,
6787            });
6788
6789            return Ok(Plan::AlterNoop(AlterNoopPlan {
6790                object_type: ObjectType::Connection,
6791            }));
6792        }
6793        Err(e) => return Err(e.into()),
6794    };
6795
6796    let connection = entry.connection()?;
6797
6798    if actions
6799        .iter()
6800        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6801    {
6802        if actions.len() > 1 {
6803            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6804        }
6805
6806        if !with_options.is_empty() {
6807            sql_bail!(
6808                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6809                with_options
6810                    .iter()
6811                    .map(|o| o.to_ast_string_simple())
6812                    .join(", ")
6813            );
6814        }
6815
6816        if !matches!(connection, Connection::Ssh(_)) {
6817            sql_bail!(
6818                "{} is not an SSH connection",
6819                scx.catalog.resolve_full_name(entry.name())
6820            )
6821        }
6822
6823        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6824            id: entry.id(),
6825            action: crate::plan::AlterConnectionAction::RotateKeys,
6826        }));
6827    }
6828
6829    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6830    if options.validate.is_some() {
6831        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6832    }
6833
6834    let validate = match options.validate {
6835        Some(val) => val,
6836        None => {
6837            scx.catalog
6838                .system_vars()
6839                .enable_default_connection_validation()
6840                && connection.validate_by_default()
6841        }
6842    };
6843
6844    let connection_type = match connection {
6845        Connection::Aws(_) => CreateConnectionType::Aws,
6846        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6847        Connection::Kafka(_) => CreateConnectionType::Kafka,
6848        Connection::Csr(_) => CreateConnectionType::Csr,
6849        Connection::Postgres(_) => CreateConnectionType::Postgres,
6850        Connection::Ssh(_) => CreateConnectionType::Ssh,
6851        Connection::MySql(_) => CreateConnectionType::MySql,
6852        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6853    };
6854
6855    // Collect all options irrespective of action taken on them.
6856    let specified_options: BTreeSet<_> = actions
6857        .iter()
6858        .map(|action: &AlterConnectionAction<Aug>| match action {
6859            AlterConnectionAction::SetOption(option) => option.name.clone(),
6860            AlterConnectionAction::DropOption(name) => name.clone(),
6861            AlterConnectionAction::RotateKeys => unreachable!(),
6862        })
6863        .collect();
6864
6865    for invalid in INALTERABLE_OPTIONS {
6866        if specified_options.contains(invalid) {
6867            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6868        }
6869    }
6870
6871    connection::validate_options_per_connection_type(connection_type, specified_options)?;
6872
6873    // Partition operations into set and drop
6874    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6875        actions.into_iter().partition_map(|action| match action {
6876            AlterConnectionAction::SetOption(option) => Either::Left(option),
6877            AlterConnectionAction::DropOption(name) => Either::Right(name),
6878            AlterConnectionAction::RotateKeys => unreachable!(),
6879        });
6880
6881    let set_options: BTreeMap<_, _> = set_options_vec
6882        .clone()
6883        .into_iter()
6884        .map(|option| (option.name, option.value))
6885        .collect();
6886
6887    // Type check values + avoid duplicates; we don't want to e.g. let users
6888    // drop and set the same option in the same statement, so treating drops as
6889    // sets here is fine.
6890    let connection_options_extracted =
6891        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6892
6893    let duplicates: Vec<_> = connection_options_extracted
6894        .seen
6895        .intersection(&drop_options)
6896        .collect();
6897
6898    if !duplicates.is_empty() {
6899        sql_bail!(
6900            "cannot both SET and DROP/RESET options {}",
6901            duplicates
6902                .iter()
6903                .map(|option| option.to_string())
6904                .join(", ")
6905        )
6906    }
6907
6908    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6909        let set_options_count = mutually_exclusive_options
6910            .iter()
6911            .filter(|o| set_options.contains_key(o))
6912            .count();
6913        let drop_options_count = mutually_exclusive_options
6914            .iter()
6915            .filter(|o| drop_options.contains(o))
6916            .count();
6917
6918        // Disallow setting _and_ resetting mutually exclusive options
6919        if set_options_count > 0 && drop_options_count > 0 {
6920            sql_bail!(
6921                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6922                connection_type,
6923                mutually_exclusive_options
6924                    .iter()
6925                    .map(|option| option.to_string())
6926                    .join(", ")
6927            )
6928        }
6929
6930        // If any option is either set or dropped, ensure all mutually exclusive
6931        // options are dropped. We do this "behind the scenes", even though we
6932        // disallow users from performing the same action because this is the
6933        // mechanism by which we overwrite values elsewhere in the code.
6934        if set_options_count > 0 || drop_options_count > 0 {
6935            drop_options.extend(mutually_exclusive_options.iter().cloned());
6936        }
6937
6938        // n.b. if mutually exclusive options are set, those will error when we
6939        // try to replan the connection.
6940    }
6941
6942    Ok(Plan::AlterConnection(AlterConnectionPlan {
6943        id: entry.id(),
6944        action: crate::plan::AlterConnectionAction::AlterOptions {
6945            set_options,
6946            drop_options,
6947            validate,
6948        },
6949    }))
6950}
6951
6952pub fn describe_alter_sink(
6953    _: &StatementContext,
6954    _: AlterSinkStatement<Aug>,
6955) -> Result<StatementDesc, PlanError> {
6956    Ok(StatementDesc::new(None))
6957}
6958
6959pub fn plan_alter_sink(
6960    scx: &mut StatementContext,
6961    stmt: AlterSinkStatement<Aug>,
6962) -> Result<Plan, PlanError> {
6963    let AlterSinkStatement {
6964        sink_name,
6965        if_exists,
6966        action,
6967    } = stmt;
6968
6969    let object_type = ObjectType::Sink;
6970    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6971
6972    let Some(item) = item else {
6973        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6974            name: sink_name.to_string(),
6975            object_type,
6976        });
6977
6978        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6979    };
6980    // Always ALTER objects from their latest version.
6981    let item = item.at_version(RelationVersionSelector::Latest);
6982
6983    match action {
6984        AlterSinkAction::ChangeRelation(new_from) => {
6985            // First we reconstruct the original CREATE SINK statement
6986            let create_sql = item.create_sql();
6987            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6988            let [stmt]: [StatementParseResult; 1] = stmts
6989                .try_into()
6990                .expect("create sql of sink was not exactly one statement");
6991            let Statement::CreateSink(mut stmt) = stmt.ast else {
6992                unreachable!("invalid create SQL for sink item");
6993            };
6994
6995            // And then we find the existing version of the sink and increase it by one
6996            let cur_version = stmt
6997                .with_options
6998                .drain_filter_swapping(|o| o.name == CreateSinkOptionName::Version)
6999                .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
7000                .max()
7001                .unwrap_or(0);
7002            let new_version = cur_version + 1;
7003            stmt.with_options.push(CreateSinkOption {
7004                name: CreateSinkOptionName::Version,
7005                value: Some(WithOptionValue::Value(Value::Number(
7006                    new_version.to_string(),
7007                ))),
7008            });
7009
7010            // Then resolve and swap the resolved from relation to the new one
7011            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7012            stmt.from = new_from;
7013
7014            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7015            let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
7016                unreachable!("invalid plan for CREATE SINK statement");
7017            };
7018
7019            Ok(Plan::AlterSink(AlterSinkPlan {
7020                item_id: item.id(),
7021                global_id: item.global_id(),
7022                sink: plan.sink,
7023                with_snapshot: plan.with_snapshot,
7024                in_cluster: plan.in_cluster,
7025            }))
7026        }
7027        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7028        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7029    }
7030}
7031
7032pub fn describe_alter_source(
7033    _: &StatementContext,
7034    _: AlterSourceStatement<Aug>,
7035) -> Result<StatementDesc, PlanError> {
7036    // TODO: put the options here, right?
7037    Ok(StatementDesc::new(None))
7038}
7039
7040generate_extracted_config!(
7041    AlterSourceAddSubsourceOption,
7042    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7043    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7044    (Details, String)
7045);
7046
7047pub fn plan_alter_source(
7048    scx: &mut StatementContext,
7049    stmt: AlterSourceStatement<Aug>,
7050) -> Result<Plan, PlanError> {
7051    let AlterSourceStatement {
7052        source_name,
7053        if_exists,
7054        action,
7055    } = stmt;
7056    let object_type = ObjectType::Source;
7057
7058    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7059        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7060            name: source_name.to_string(),
7061            object_type,
7062        });
7063
7064        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7065    }
7066
7067    match action {
7068        AlterSourceAction::SetOptions(options) => {
7069            let mut options = options.into_iter();
7070            let option = options.next().unwrap();
7071            if option.name == CreateSourceOptionName::RetainHistory {
7072                if options.next().is_some() {
7073                    sql_bail!("RETAIN HISTORY must be only option");
7074                }
7075                return alter_retain_history(
7076                    scx,
7077                    object_type,
7078                    if_exists,
7079                    UnresolvedObjectName::Item(source_name),
7080                    option.value,
7081                );
7082            }
7083            // n.b we use this statement in purification in a way that cannot be
7084            // planned directly.
7085            sql_bail!(
7086                "Cannot modify the {} of a SOURCE.",
7087                option.name.to_ast_string_simple()
7088            );
7089        }
7090        AlterSourceAction::ResetOptions(reset) => {
7091            let mut options = reset.into_iter();
7092            let option = options.next().unwrap();
7093            if option == CreateSourceOptionName::RetainHistory {
7094                if options.next().is_some() {
7095                    sql_bail!("RETAIN HISTORY must be only option");
7096                }
7097                return alter_retain_history(
7098                    scx,
7099                    object_type,
7100                    if_exists,
7101                    UnresolvedObjectName::Item(source_name),
7102                    None,
7103                );
7104            }
7105            sql_bail!(
7106                "Cannot modify the {} of a SOURCE.",
7107                option.to_ast_string_simple()
7108            );
7109        }
7110        AlterSourceAction::DropSubsources { .. } => {
7111            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7112        }
7113        AlterSourceAction::AddSubsources { .. } => {
7114            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7115        }
7116        AlterSourceAction::RefreshReferences => {
7117            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7118        }
7119    };
7120}
7121
7122pub fn describe_alter_system_set(
7123    _: &StatementContext,
7124    _: AlterSystemSetStatement,
7125) -> Result<StatementDesc, PlanError> {
7126    Ok(StatementDesc::new(None))
7127}
7128
7129pub fn plan_alter_system_set(
7130    _: &StatementContext,
7131    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7132) -> Result<Plan, PlanError> {
7133    let name = name.to_string();
7134    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7135        name,
7136        value: scl::plan_set_variable_to(to)?,
7137    }))
7138}
7139
7140pub fn describe_alter_system_reset(
7141    _: &StatementContext,
7142    _: AlterSystemResetStatement,
7143) -> Result<StatementDesc, PlanError> {
7144    Ok(StatementDesc::new(None))
7145}
7146
7147pub fn plan_alter_system_reset(
7148    _: &StatementContext,
7149    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7150) -> Result<Plan, PlanError> {
7151    let name = name.to_string();
7152    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7153}
7154
7155pub fn describe_alter_system_reset_all(
7156    _: &StatementContext,
7157    _: AlterSystemResetAllStatement,
7158) -> Result<StatementDesc, PlanError> {
7159    Ok(StatementDesc::new(None))
7160}
7161
7162pub fn plan_alter_system_reset_all(
7163    _: &StatementContext,
7164    _: AlterSystemResetAllStatement,
7165) -> Result<Plan, PlanError> {
7166    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7167}
7168
7169pub fn describe_alter_role(
7170    _: &StatementContext,
7171    _: AlterRoleStatement<Aug>,
7172) -> Result<StatementDesc, PlanError> {
7173    Ok(StatementDesc::new(None))
7174}
7175
7176pub fn plan_alter_role(
7177    _scx: &StatementContext,
7178    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7179) -> Result<Plan, PlanError> {
7180    let option = match option {
7181        AlterRoleOption::Attributes(attrs) => {
7182            let attrs = plan_role_attributes(attrs)?;
7183            PlannedAlterRoleOption::Attributes(attrs)
7184        }
7185        AlterRoleOption::Variable(variable) => {
7186            let var = plan_role_variable(variable)?;
7187            PlannedAlterRoleOption::Variable(var)
7188        }
7189    };
7190
7191    Ok(Plan::AlterRole(AlterRolePlan {
7192        id: name.id,
7193        name: name.name,
7194        option,
7195    }))
7196}
7197
7198pub fn describe_alter_table_add_column(
7199    _: &StatementContext,
7200    _: AlterTableAddColumnStatement<Aug>,
7201) -> Result<StatementDesc, PlanError> {
7202    Ok(StatementDesc::new(None))
7203}
7204
7205pub fn plan_alter_table_add_column(
7206    scx: &StatementContext,
7207    stmt: AlterTableAddColumnStatement<Aug>,
7208) -> Result<Plan, PlanError> {
7209    let AlterTableAddColumnStatement {
7210        if_exists,
7211        name,
7212        if_col_not_exist,
7213        column_name,
7214        data_type,
7215    } = stmt;
7216    let object_type = ObjectType::Table;
7217
7218    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7219
7220    let (relation_id, item_name, desc) =
7221        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7222            Some(item) => {
7223                // Always add columns to the latest version of the item.
7224                let item_name = scx.catalog.resolve_full_name(item.name());
7225                let item = item.at_version(RelationVersionSelector::Latest);
7226                let desc = item.desc(&item_name)?.into_owned();
7227                (item.id(), item_name, desc)
7228            }
7229            None => {
7230                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7231                    name: name.to_ast_string_simple(),
7232                    object_type,
7233                });
7234                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7235            }
7236        };
7237
7238    let column_name = ColumnName::from(column_name.as_str());
7239    if desc.get_by_name(&column_name).is_some() {
7240        if if_col_not_exist {
7241            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7242                column_name: column_name.to_string(),
7243                object_name: item_name.item,
7244            });
7245            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7246        } else {
7247            return Err(PlanError::ColumnAlreadyExists {
7248                column_name,
7249                object_name: item_name.item,
7250            });
7251        }
7252    }
7253
7254    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7255    // TODO(alter_table): Support non-nullable columns with default values.
7256    let column_type = scalar_type.nullable(true);
7257    // "unresolve" our data type so we can later update the persisted create_sql.
7258    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7259
7260    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7261        relation_id,
7262        column_name,
7263        column_type,
7264        raw_sql_type,
7265    }))
7266}
7267
7268pub fn describe_comment(
7269    _: &StatementContext,
7270    _: CommentStatement<Aug>,
7271) -> Result<StatementDesc, PlanError> {
7272    Ok(StatementDesc::new(None))
7273}
7274
7275pub fn plan_comment(
7276    scx: &mut StatementContext,
7277    stmt: CommentStatement<Aug>,
7278) -> Result<Plan, PlanError> {
7279    const MAX_COMMENT_LENGTH: usize = 1024;
7280
7281    let CommentStatement { object, comment } = stmt;
7282
7283    // TODO(parkmycar): Make max comment length configurable.
7284    if let Some(c) = &comment {
7285        if c.len() > 1024 {
7286            return Err(PlanError::CommentTooLong {
7287                length: c.len(),
7288                max_size: MAX_COMMENT_LENGTH,
7289            });
7290        }
7291    }
7292
7293    let (object_id, column_pos) = match &object {
7294        com_ty @ CommentObjectType::Table { name }
7295        | com_ty @ CommentObjectType::View { name }
7296        | com_ty @ CommentObjectType::MaterializedView { name }
7297        | com_ty @ CommentObjectType::Index { name }
7298        | com_ty @ CommentObjectType::Func { name }
7299        | com_ty @ CommentObjectType::Connection { name }
7300        | com_ty @ CommentObjectType::Source { name }
7301        | com_ty @ CommentObjectType::Sink { name }
7302        | com_ty @ CommentObjectType::Secret { name }
7303        | com_ty @ CommentObjectType::ContinualTask { name } => {
7304            let item = scx.get_item_by_resolved_name(name)?;
7305            match (com_ty, item.item_type()) {
7306                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7307                    (CommentObjectId::Table(item.id()), None)
7308                }
7309                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7310                    (CommentObjectId::View(item.id()), None)
7311                }
7312                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7313                    (CommentObjectId::MaterializedView(item.id()), None)
7314                }
7315                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7316                    (CommentObjectId::Index(item.id()), None)
7317                }
7318                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7319                    (CommentObjectId::Func(item.id()), None)
7320                }
7321                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7322                    (CommentObjectId::Connection(item.id()), None)
7323                }
7324                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7325                    (CommentObjectId::Source(item.id()), None)
7326                }
7327                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7328                    (CommentObjectId::Sink(item.id()), None)
7329                }
7330                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7331                    (CommentObjectId::Secret(item.id()), None)
7332                }
7333                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7334                    (CommentObjectId::ContinualTask(item.id()), None)
7335                }
7336                (com_ty, cat_ty) => {
7337                    let expected_type = match com_ty {
7338                        CommentObjectType::Table { .. } => ObjectType::Table,
7339                        CommentObjectType::View { .. } => ObjectType::View,
7340                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7341                        CommentObjectType::Index { .. } => ObjectType::Index,
7342                        CommentObjectType::Func { .. } => ObjectType::Func,
7343                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7344                        CommentObjectType::Source { .. } => ObjectType::Source,
7345                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7346                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7347                        _ => unreachable!("these are the only types we match on"),
7348                    };
7349
7350                    return Err(PlanError::InvalidObjectType {
7351                        expected_type: SystemObjectType::Object(expected_type),
7352                        actual_type: SystemObjectType::Object(cat_ty.into()),
7353                        object_name: item.name().item.clone(),
7354                    });
7355                }
7356            }
7357        }
7358        CommentObjectType::Type { ty } => match ty {
7359            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7360                sql_bail!("cannot comment on anonymous list or map type");
7361            }
7362            ResolvedDataType::Named { id, modifiers, .. } => {
7363                if !modifiers.is_empty() {
7364                    sql_bail!("cannot comment on type with modifiers");
7365                }
7366                (CommentObjectId::Type(*id), None)
7367            }
7368            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7369        },
7370        CommentObjectType::Column { name } => {
7371            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7372            match item.item_type() {
7373                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7374                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7375                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7376                CatalogItemType::MaterializedView => {
7377                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7378                }
7379                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7380                r => {
7381                    return Err(PlanError::Unsupported {
7382                        feature: format!("Specifying comments on a column of {r}"),
7383                        discussion_no: None,
7384                    });
7385                }
7386            }
7387        }
7388        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7389        CommentObjectType::Database { name } => {
7390            (CommentObjectId::Database(*name.database_id()), None)
7391        }
7392        CommentObjectType::Schema { name } => (
7393            CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7394            None,
7395        ),
7396        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7397        CommentObjectType::ClusterReplica { name } => {
7398            let replica = scx.catalog.resolve_cluster_replica(name)?;
7399            (
7400                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7401                None,
7402            )
7403        }
7404        CommentObjectType::NetworkPolicy { name } => {
7405            (CommentObjectId::NetworkPolicy(name.id), None)
7406        }
7407    };
7408
7409    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7410    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7411    // it's the easiest place to raise an error.
7412    //
7413    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7414    if let Some(p) = column_pos {
7415        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7416            max_num_columns: MAX_NUM_COLUMNS,
7417            req_num_columns: p,
7418        })?;
7419    }
7420
7421    Ok(Plan::Comment(CommentPlan {
7422        object_id,
7423        sub_component: column_pos,
7424        comment,
7425    }))
7426}
7427
7428pub(crate) fn resolve_cluster<'a>(
7429    scx: &'a StatementContext,
7430    name: &'a Ident,
7431    if_exists: bool,
7432) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7433    match scx.resolve_cluster(Some(name)) {
7434        Ok(cluster) => Ok(Some(cluster)),
7435        Err(_) if if_exists => Ok(None),
7436        Err(e) => Err(e),
7437    }
7438}
7439
7440pub(crate) fn resolve_cluster_replica<'a>(
7441    scx: &'a StatementContext,
7442    name: &QualifiedReplica,
7443    if_exists: bool,
7444) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7445    match scx.resolve_cluster(Some(&name.cluster)) {
7446        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7447            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7448            None if if_exists => Ok(None),
7449            None => Err(sql_err!(
7450                "CLUSTER {} has no CLUSTER REPLICA named {}",
7451                cluster.name(),
7452                name.replica.as_str().quoted(),
7453            )),
7454        },
7455        Err(_) if if_exists => Ok(None),
7456        Err(e) => Err(e),
7457    }
7458}
7459
7460pub(crate) fn resolve_database<'a>(
7461    scx: &'a StatementContext,
7462    name: &'a UnresolvedDatabaseName,
7463    if_exists: bool,
7464) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7465    match scx.resolve_database(name) {
7466        Ok(database) => Ok(Some(database)),
7467        Err(_) if if_exists => Ok(None),
7468        Err(e) => Err(e),
7469    }
7470}
7471
7472pub(crate) fn resolve_schema<'a>(
7473    scx: &'a StatementContext,
7474    name: UnresolvedSchemaName,
7475    if_exists: bool,
7476) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7477    match scx.resolve_schema(name) {
7478        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7479        Err(_) if if_exists => Ok(None),
7480        Err(e) => Err(e),
7481    }
7482}
7483
7484pub(crate) fn resolve_network_policy<'a>(
7485    scx: &'a StatementContext,
7486    name: Ident,
7487    if_exists: bool,
7488) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7489    match scx.catalog.resolve_network_policy(&name.to_string()) {
7490        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7491            id: policy.id(),
7492            name: policy.name().to_string(),
7493        })),
7494        Err(_) if if_exists => Ok(None),
7495        Err(e) => Err(e.into()),
7496    }
7497}
7498
7499pub(crate) fn resolve_item_or_type<'a>(
7500    scx: &'a StatementContext,
7501    object_type: ObjectType,
7502    name: UnresolvedItemName,
7503    if_exists: bool,
7504) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7505    let name = normalize::unresolved_item_name(name)?;
7506    let catalog_item = match object_type {
7507        ObjectType::Type => scx.catalog.resolve_type(&name),
7508        _ => scx.catalog.resolve_item(&name),
7509    };
7510
7511    match catalog_item {
7512        Ok(item) => {
7513            let is_type = ObjectType::from(item.item_type());
7514            if object_type == is_type {
7515                Ok(Some(item))
7516            } else {
7517                Err(PlanError::MismatchedObjectType {
7518                    name: scx.catalog.minimal_qualification(item.name()),
7519                    is_type,
7520                    expected_type: object_type,
7521                })
7522            }
7523        }
7524        Err(_) if if_exists => Ok(None),
7525        Err(e) => Err(e.into()),
7526    }
7527}
7528
7529/// Returns an error if the given cluster is a managed cluster
7530fn ensure_cluster_is_not_managed(
7531    scx: &StatementContext,
7532    cluster_id: ClusterId,
7533) -> Result<(), PlanError> {
7534    let cluster = scx.catalog.get_cluster(cluster_id);
7535    if cluster.is_managed() {
7536        Err(PlanError::ManagedCluster {
7537            cluster_name: cluster.name().to_string(),
7538        })
7539    } else {
7540        Ok(())
7541    }
7542}