mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_proto::RustType;
33use mz_repr::adt::interval::Interval;
34use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
35use mz_repr::network_policy_id::NetworkPolicyId;
36use mz_repr::optimize::OptimizerFeatureOverrides;
37use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
38use mz_repr::role_id::RoleId;
39use mz_repr::{
40    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
41    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
42    preserves_order, strconv,
43};
44use mz_sql_parser::ast::{
45    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
46    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
47    AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
48    AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
49    AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
50    AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
51    AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
52    AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
53    ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
54    ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
55    ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
56    ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
57    ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
58    CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
59    CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
60    CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
61    CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
62    CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
63    CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
64    CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
65    CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
66    CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
67    CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
68    CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
69    CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
70    DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
71    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
72    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
73    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
74    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
75    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
76    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
77    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
78    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
79    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
80    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
81    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
82};
83use mz_sql_parser::ident;
84use mz_sql_parser::parser::StatementParseResult;
85use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
86use mz_storage_types::connections::{Connection, KafkaTopicOptions};
87use mz_storage_types::sinks::{
88    KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
89    StorageSinkConnection,
90};
91use mz_storage_types::sources::encoding::{
92    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
93    SourceDataEncoding, included_column_desc,
94};
95use mz_storage_types::sources::envelope::{
96    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
97};
98use mz_storage_types::sources::kafka::{
99    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
100};
101use mz_storage_types::sources::load_generator::{
102    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
103    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
104};
105use mz_storage_types::sources::mysql::{
106    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
107};
108use mz_storage_types::sources::postgres::{
109    PostgresSourceConnection, PostgresSourcePublicationDetails,
110    ProtoPostgresSourcePublicationDetails,
111};
112use mz_storage_types::sources::sql_server::{
113    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
114};
115use mz_storage_types::sources::{
116    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
117    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
118    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
119    SqlServerSourceExtras, Timeline,
120};
121use prost::Message;
122
123use crate::ast::display::AstDisplay;
124use crate::catalog::{
125    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
126    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
127};
128use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
129use crate::names::{
130    Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
131    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
132    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
133};
134use crate::normalize::{self, ident};
135use crate::plan::error::PlanError;
136use crate::plan::query::{
137    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
138    scalar_type_from_sql,
139};
140use crate::plan::scope::Scope;
141use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
142use crate::plan::statement::{StatementContext, StatementDesc, scl};
143use crate::plan::typeconv::CastContext;
144use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
145use crate::plan::{
146    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
147    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
148    AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
149    AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
150    AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
151    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
152    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
153    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
154    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
155    CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
156    CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
157    CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
158    MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
159    PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
160    Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
161    WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
162    transform_ast,
163};
164use crate::session::vars::{
165    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
166    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
167};
168use crate::{names, parse};
169
170mod connection;
171
172// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
173//
174// The real max is probably higher than this, but it's easier to relax a constraint than make it
175// more strict.
176const MAX_NUM_COLUMNS: usize = 256;
177
178static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
179    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
180
181/// Given a relation desc and a column list, checks that:
182/// - the column list is a prefix of the desc;
183/// - all the listed columns are types that have meaningful Persist-level ordering.
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
185    if partition_by.len() > desc.len() {
186        tracing::error!(
187            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
188            desc.len(),
189            partition_by.len()
190        );
191        partition_by.truncate(desc.len());
192    }
193
194    let desc_prefix = desc.iter().take(partition_by.len());
195    for (idx, ((desc_name, desc_type), partition_name)) in
196        desc_prefix.zip_eq(partition_by).enumerate()
197    {
198        let partition_name = normalize::column_name(partition_name);
199        if *desc_name != partition_name {
200            sql_bail!(
201                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
202            );
203        }
204        if !preserves_order(&desc_type.scalar_type) {
205            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
206        }
207    }
208    Ok(())
209}
210
211pub fn describe_create_database(
212    _: &StatementContext,
213    _: CreateDatabaseStatement,
214) -> Result<StatementDesc, PlanError> {
215    Ok(StatementDesc::new(None))
216}
217
218pub fn plan_create_database(
219    _: &StatementContext,
220    CreateDatabaseStatement {
221        name,
222        if_not_exists,
223    }: CreateDatabaseStatement,
224) -> Result<Plan, PlanError> {
225    Ok(Plan::CreateDatabase(CreateDatabasePlan {
226        name: normalize::ident(name.0),
227        if_not_exists,
228    }))
229}
230
231pub fn describe_create_schema(
232    _: &StatementContext,
233    _: CreateSchemaStatement,
234) -> Result<StatementDesc, PlanError> {
235    Ok(StatementDesc::new(None))
236}
237
238pub fn plan_create_schema(
239    scx: &StatementContext,
240    CreateSchemaStatement {
241        mut name,
242        if_not_exists,
243    }: CreateSchemaStatement,
244) -> Result<Plan, PlanError> {
245    if name.0.len() > 2 {
246        sql_bail!("schema name {} has more than two components", name);
247    }
248    let schema_name = normalize::ident(
249        name.0
250            .pop()
251            .expect("names always have at least one component"),
252    );
253    let database_spec = match name.0.pop() {
254        None => match scx.catalog.active_database() {
255            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
256            None => sql_bail!("no database specified and no active database"),
257        },
258        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
259            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
260            Err(_) => sql_bail!("invalid database {}", n.as_str()),
261        },
262    };
263    Ok(Plan::CreateSchema(CreateSchemaPlan {
264        database_spec,
265        schema_name,
266        if_not_exists,
267    }))
268}
269
270pub fn describe_create_table(
271    _: &StatementContext,
272    _: CreateTableStatement<Aug>,
273) -> Result<StatementDesc, PlanError> {
274    Ok(StatementDesc::new(None))
275}
276
277pub fn plan_create_table(
278    scx: &StatementContext,
279    stmt: CreateTableStatement<Aug>,
280) -> Result<Plan, PlanError> {
281    let CreateTableStatement {
282        name,
283        columns,
284        constraints,
285        if_not_exists,
286        temporary,
287        with_options,
288    } = &stmt;
289
290    let names: Vec<_> = columns
291        .iter()
292        .filter(|c| {
293            // This set of `names` is used to create the initial RelationDesc.
294            // Columns that have been added at later versions of the table will
295            // get added further below.
296            let is_versioned = c
297                .options
298                .iter()
299                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
300            !is_versioned
301        })
302        .map(|c| normalize::column_name(c.name.clone()))
303        .collect();
304
305    if let Some(dup) = names.iter().duplicates().next() {
306        sql_bail!("column {} specified more than once", dup.quoted());
307    }
308
309    // Build initial relation type that handles declared data types
310    // and NOT NULL constraints.
311    let mut column_types = Vec::with_capacity(columns.len());
312    let mut defaults = Vec::with_capacity(columns.len());
313    let mut changes = BTreeMap::new();
314    let mut keys = Vec::new();
315
316    for (i, c) in columns.into_iter().enumerate() {
317        let aug_data_type = &c.data_type;
318        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
319        let mut nullable = true;
320        let mut default = Expr::null();
321        let mut versioned = false;
322        for option in &c.options {
323            match &option.option {
324                ColumnOption::NotNull => nullable = false,
325                ColumnOption::Default(expr) => {
326                    // Ensure expression can be planned and yields the correct
327                    // type.
328                    let mut expr = expr.clone();
329                    transform_ast::transform(scx, &mut expr)?;
330                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
331                    default = expr.clone();
332                }
333                ColumnOption::Unique { is_primary } => {
334                    keys.push(vec![i]);
335                    if *is_primary {
336                        nullable = false;
337                    }
338                }
339                ColumnOption::Versioned { action, version } => {
340                    let version = RelationVersion::from(*version);
341                    versioned = true;
342
343                    let name = normalize::column_name(c.name.clone());
344                    let typ = ty.clone().nullable(nullable);
345
346                    changes.insert(version, (action.clone(), name, typ));
347                }
348                other => {
349                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
350                }
351            }
352        }
353        // TODO(alter_table): This assumes all versioned columns are at the
354        // end. This will no longer be true when we support dropping columns.
355        if !versioned {
356            column_types.push(ty.nullable(nullable));
357        }
358        defaults.push(default);
359    }
360
361    let mut seen_primary = false;
362    'c: for constraint in constraints {
363        match constraint {
364            TableConstraint::Unique {
365                name: _,
366                columns,
367                is_primary,
368                nulls_not_distinct,
369            } => {
370                if seen_primary && *is_primary {
371                    sql_bail!(
372                        "multiple primary keys for table {} are not allowed",
373                        name.to_ast_string_stable()
374                    );
375                }
376                seen_primary = *is_primary || seen_primary;
377
378                let mut key = vec![];
379                for column in columns {
380                    let column = normalize::column_name(column.clone());
381                    match names.iter().position(|name| *name == column) {
382                        None => sql_bail!("unknown column in constraint: {}", column),
383                        Some(i) => {
384                            let nullable = &mut column_types[i].nullable;
385                            if *is_primary {
386                                if *nulls_not_distinct {
387                                    sql_bail!(
388                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
389                                    );
390                                }
391
392                                *nullable = false;
393                            } else if !(*nulls_not_distinct || !*nullable) {
394                                // Non-primary key unique constraints are only keys if all of their
395                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
396                                break 'c;
397                            }
398
399                            key.push(i);
400                        }
401                    }
402                }
403
404                if *is_primary {
405                    keys.insert(0, key);
406                } else {
407                    keys.push(key);
408                }
409            }
410            TableConstraint::ForeignKey { .. } => {
411                // Foreign key constraints are not presently enforced. We allow
412                // them with feature flags for sqllogictest's sake.
413                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
414            }
415            TableConstraint::Check { .. } => {
416                // Check constraints are not presently enforced. We allow them
417                // with feature flags for sqllogictest's sake.
418                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
419            }
420        }
421    }
422
423    if !keys.is_empty() {
424        // Unique constraints are not presently enforced. We allow them with feature flags for
425        // sqllogictest's sake.
426        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
427    }
428
429    let typ = SqlRelationType::new(column_types).with_keys(keys);
430
431    let temporary = *temporary;
432    let name = if temporary {
433        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
434    } else {
435        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
436    };
437
438    // Check for an object in the catalog with this same name
439    let full_name = scx.catalog.resolve_full_name(&name);
440    let partial_name = PartialItemName::from(full_name.clone());
441    // For PostgreSQL compatibility, we need to prevent creating tables when
442    // there is an existing object *or* type of the same name.
443    if let (false, Ok(item)) = (
444        if_not_exists,
445        scx.catalog.resolve_item_or_type(&partial_name),
446    ) {
447        return Err(PlanError::ItemAlreadyExists {
448            name: full_name.to_string(),
449            item_type: item.item_type(),
450        });
451    }
452
453    let desc = RelationDesc::new(typ, names);
454    let mut desc = VersionedRelationDesc::new(desc);
455    for (version, (_action, name, typ)) in changes.into_iter() {
456        let new_version = desc.add_column(name, typ);
457        if version != new_version {
458            return Err(PlanError::InvalidTable {
459                name: full_name.item,
460            });
461        }
462    }
463
464    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
465
466    // Table options should only consider the original columns, since those
467    // were the only ones in scope when the table was created.
468    //
469    // TODO(alter_table): Will need to reconsider this when we support ALTERing
470    // the PARTITION BY columns.
471    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
472    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
473
474    let compaction_window = options.iter().find_map(|o| {
475        #[allow(irrefutable_let_patterns)]
476        if let crate::plan::TableOption::RetainHistory(lcw) = o {
477            Some(lcw.clone())
478        } else {
479            None
480        }
481    });
482
483    let table = Table {
484        create_sql,
485        desc,
486        temporary,
487        compaction_window,
488        data_source: TableDataSource::TableWrites { defaults },
489    };
490    Ok(Plan::CreateTable(CreateTablePlan {
491        name,
492        table,
493        if_not_exists: *if_not_exists,
494    }))
495}
496
497pub fn describe_create_table_from_source(
498    _: &StatementContext,
499    _: CreateTableFromSourceStatement<Aug>,
500) -> Result<StatementDesc, PlanError> {
501    Ok(StatementDesc::new(None))
502}
503
504pub fn describe_create_webhook_source(
505    _: &StatementContext,
506    _: CreateWebhookSourceStatement<Aug>,
507) -> Result<StatementDesc, PlanError> {
508    Ok(StatementDesc::new(None))
509}
510
511pub fn describe_create_source(
512    _: &StatementContext,
513    _: CreateSourceStatement<Aug>,
514) -> Result<StatementDesc, PlanError> {
515    Ok(StatementDesc::new(None))
516}
517
518pub fn describe_create_subsource(
519    _: &StatementContext,
520    _: CreateSubsourceStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522    Ok(StatementDesc::new(None))
523}
524
525generate_extracted_config!(
526    CreateSourceOption,
527    (TimestampInterval, Duration),
528    (RetainHistory, OptionalDuration)
529);
530
531generate_extracted_config!(
532    PgConfigOption,
533    (Details, String),
534    (Publication, String),
535    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
536);
537
538generate_extracted_config!(
539    MySqlConfigOption,
540    (Details, String),
541    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
542    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
543);
544
545generate_extracted_config!(
546    SqlServerConfigOption,
547    (Details, String),
548    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
549    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
550);
551
552pub fn plan_create_webhook_source(
553    scx: &StatementContext,
554    mut stmt: CreateWebhookSourceStatement<Aug>,
555) -> Result<Plan, PlanError> {
556    if stmt.is_table {
557        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
558    }
559
560    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
561    // we plan to normalize when we canonicalize the create statement.
562    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
563    let enable_multi_replica_sources =
564        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
565    if !enable_multi_replica_sources {
566        if in_cluster.replica_ids().len() > 1 {
567            sql_bail!("cannot create webhook source in cluster with more than one replica")
568        }
569    }
570    let create_sql =
571        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
572
573    let CreateWebhookSourceStatement {
574        name,
575        if_not_exists,
576        body_format,
577        include_headers,
578        validate_using,
579        is_table,
580        // We resolved `in_cluster` above, so we want to ignore it here.
581        in_cluster: _,
582    } = stmt;
583
584    let validate_using = validate_using
585        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
586        .transpose()?;
587    if let Some(WebhookValidation { expression, .. }) = &validate_using {
588        // If the validation expression doesn't reference any part of the request, then we should
589        // return an error because it's almost definitely wrong.
590        if !expression.contains_column() {
591            return Err(PlanError::WebhookValidationDoesNotUseColumns);
592        }
593        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
594        // allow calls to `now()` because some webhook providers recommend rejecting requests that
595        // are older than a certain threshold.
596        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
597            return Err(PlanError::WebhookValidationNonDeterministic);
598        }
599    }
600
601    let body_format = match body_format {
602        Format::Bytes => WebhookBodyFormat::Bytes,
603        Format::Json { array } => WebhookBodyFormat::Json { array },
604        Format::Text => WebhookBodyFormat::Text,
605        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
606        ty => {
607            return Err(PlanError::Unsupported {
608                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
609                discussion_no: None,
610            });
611        }
612    };
613
614    let mut column_ty = vec![
615        // Always include the body of the request as the first column.
616        SqlColumnType {
617            scalar_type: SqlScalarType::from(body_format),
618            nullable: false,
619        },
620    ];
621    let mut column_names = vec!["body".to_string()];
622
623    let mut headers = WebhookHeaders::default();
624
625    // Include a `headers` column, possibly filtered.
626    if let Some(filters) = include_headers.column {
627        column_ty.push(SqlColumnType {
628            scalar_type: SqlScalarType::Map {
629                value_type: Box::new(SqlScalarType::String),
630                custom_id: None,
631            },
632            nullable: false,
633        });
634        column_names.push("headers".to_string());
635
636        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
637            filters.into_iter().partition_map(|filter| {
638                if filter.block {
639                    itertools::Either::Right(filter.header_name)
640                } else {
641                    itertools::Either::Left(filter.header_name)
642                }
643            });
644        headers.header_column = Some(WebhookHeaderFilters { allow, block });
645    }
646
647    // Map headers to specific columns.
648    for header in include_headers.mappings {
649        let scalar_type = header
650            .use_bytes
651            .then_some(SqlScalarType::Bytes)
652            .unwrap_or(SqlScalarType::String);
653        column_ty.push(SqlColumnType {
654            scalar_type,
655            nullable: true,
656        });
657        column_names.push(header.column_name.into_string());
658
659        let column_idx = column_ty.len() - 1;
660        // Double check we're consistent with column names.
661        assert_eq!(
662            column_idx,
663            column_names.len() - 1,
664            "header column names and types don't match"
665        );
666        headers
667            .mapped_headers
668            .insert(column_idx, (header.header_name, header.use_bytes));
669    }
670
671    // Validate our columns.
672    let mut unique_check = HashSet::with_capacity(column_names.len());
673    for name in &column_names {
674        if !unique_check.insert(name) {
675            return Err(PlanError::AmbiguousColumn(name.clone().into()));
676        }
677    }
678    if column_names.len() > MAX_NUM_COLUMNS {
679        return Err(PlanError::TooManyColumns {
680            max_num_columns: MAX_NUM_COLUMNS,
681            req_num_columns: column_names.len(),
682        });
683    }
684
685    let typ = SqlRelationType::new(column_ty);
686    let desc = RelationDesc::new(typ, column_names);
687
688    // Check for an object in the catalog with this same name
689    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
690    let full_name = scx.catalog.resolve_full_name(&name);
691    let partial_name = PartialItemName::from(full_name.clone());
692    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
693        return Err(PlanError::ItemAlreadyExists {
694            name: full_name.to_string(),
695            item_type: item.item_type(),
696        });
697    }
698
699    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
700    // such, we always use a default of EpochMilliseconds.
701    let timeline = Timeline::EpochMilliseconds;
702
703    let plan = if is_table {
704        let data_source = DataSourceDesc::Webhook {
705            validate_using,
706            body_format,
707            headers,
708            cluster_id: Some(in_cluster.id()),
709        };
710        let data_source = TableDataSource::DataSource {
711            desc: data_source,
712            timeline,
713        };
714        Plan::CreateTable(CreateTablePlan {
715            name,
716            if_not_exists,
717            table: Table {
718                create_sql,
719                desc: VersionedRelationDesc::new(desc),
720                temporary: false,
721                compaction_window: None,
722                data_source,
723            },
724        })
725    } else {
726        let data_source = DataSourceDesc::Webhook {
727            validate_using,
728            body_format,
729            headers,
730            // Important: The cluster is set at the `Source` level.
731            cluster_id: None,
732        };
733        Plan::CreateSource(CreateSourcePlan {
734            name,
735            source: Source {
736                create_sql,
737                data_source,
738                desc,
739                compaction_window: None,
740            },
741            if_not_exists,
742            timeline,
743            in_cluster: Some(in_cluster.id()),
744        })
745    };
746
747    Ok(plan)
748}
749
750pub fn plan_create_source(
751    scx: &StatementContext,
752    mut stmt: CreateSourceStatement<Aug>,
753) -> Result<Plan, PlanError> {
754    let CreateSourceStatement {
755        name,
756        in_cluster: _,
757        col_names,
758        connection: source_connection,
759        envelope,
760        if_not_exists,
761        format,
762        key_constraint,
763        include_metadata,
764        with_options,
765        external_references: referenced_subsources,
766        progress_subsource,
767    } = &stmt;
768
769    mz_ore::soft_assert_or_log!(
770        referenced_subsources.is_none(),
771        "referenced subsources must be cleared in purification"
772    );
773
774    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
775        && scx.catalog.system_vars().force_source_table_syntax();
776
777    // If the new source table syntax is forced all the options related to the primary
778    // source output should be un-set.
779    if force_source_table_syntax {
780        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
781            Err(PlanError::UseTablesForSources(
782                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
783            ))?;
784        }
785    }
786
787    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
788
789    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
790        && include_metadata
791            .iter()
792            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
793    {
794        // TODO(guswynn): should this be `bail_unsupported!`?
795        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
796    }
797    if !matches!(
798        source_connection,
799        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
800    ) && !include_metadata.is_empty()
801    {
802        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
803    }
804
805    if !include_metadata.is_empty()
806        && !matches!(
807            envelope,
808            ast::SourceEnvelope::Upsert { .. }
809                | ast::SourceEnvelope::None
810                | ast::SourceEnvelope::Debezium
811        )
812    {
813        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
814    }
815
816    let external_connection =
817        plan_generic_source_connection(scx, source_connection, include_metadata)?;
818
819    let CreateSourceOptionExtracted {
820        timestamp_interval,
821        retain_history,
822        seen: _,
823    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
824
825    let metadata_columns_desc = match external_connection {
826        GenericSourceConnection::Kafka(KafkaSourceConnection {
827            ref metadata_columns,
828            ..
829        }) => kafka_metadata_columns_desc(metadata_columns),
830        _ => vec![],
831    };
832
833    // Generate the relation description for the primary export of the source.
834    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
835        scx,
836        &envelope,
837        format,
838        Some(external_connection.default_key_desc()),
839        external_connection.default_value_desc(),
840        include_metadata,
841        metadata_columns_desc,
842        &external_connection,
843    )?;
844    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
845
846    let names: Vec<_> = desc.iter_names().cloned().collect();
847    if let Some(dup) = names.iter().duplicates().next() {
848        sql_bail!("column {} specified more than once", dup.quoted());
849    }
850
851    // Apply user-specified key constraint
852    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
853        // Don't remove this without addressing
854        // https://github.com/MaterializeInc/database-issues/issues/4371.
855        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
856
857        let key_columns = columns
858            .into_iter()
859            .map(normalize::column_name)
860            .collect::<Vec<_>>();
861
862        let mut uniq = BTreeSet::new();
863        for col in key_columns.iter() {
864            if !uniq.insert(col) {
865                sql_bail!("Repeated column name in source key constraint: {}", col);
866            }
867        }
868
869        let key_indices = key_columns
870            .iter()
871            .map(|col| {
872                let name_idx = desc
873                    .get_by_name(col)
874                    .map(|(idx, _type)| idx)
875                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
876                if desc.get_unambiguous_name(name_idx).is_none() {
877                    sql_bail!("Ambiguous column in source key constraint: {}", col);
878                }
879                Ok(name_idx)
880            })
881            .collect::<Result<Vec<_>, _>>()?;
882
883        if !desc.typ().keys.is_empty() {
884            return Err(key_constraint_err(&desc, &key_columns));
885        } else {
886            desc = desc.with_key(key_indices);
887        }
888    }
889
890    let timestamp_interval = match timestamp_interval {
891        Some(duration) => {
892            let min = scx.catalog.system_vars().min_timestamp_interval();
893            let max = scx.catalog.system_vars().max_timestamp_interval();
894            if duration < min || duration > max {
895                return Err(PlanError::InvalidTimestampInterval {
896                    min,
897                    max,
898                    requested: duration,
899                });
900            }
901            duration
902        }
903        None => scx.catalog.config().timestamp_interval,
904    };
905
906    let source_desc = SourceDesc::<ReferencedConnection> {
907        connection: external_connection,
908        timestamp_interval,
909    };
910
911    let data_source = match progress_subsource {
912        Some(name) => {
913            let DeferredItemName::Named(name) = name else {
914                sql_bail!("[internal error] progress subsource must be named during purification");
915            };
916            let ResolvedItemName::Item { id, .. } = name else {
917                sql_bail!("[internal error] invalid target id");
918            };
919
920            let details = match source_desc.connection {
921                GenericSourceConnection::Kafka(ref c) => {
922                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
923                        metadata_columns: c.metadata_columns.clone(),
924                    })
925                }
926                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
927                    LoadGenerator::Auction
928                    | LoadGenerator::Marketing
929                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
930                    LoadGenerator::Counter { .. }
931                    | LoadGenerator::Clock
932                    | LoadGenerator::Datums
933                    | LoadGenerator::KeyValue(_) => {
934                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
935                            output: LoadGeneratorOutput::Default,
936                        })
937                    }
938                },
939                GenericSourceConnection::Postgres(_)
940                | GenericSourceConnection::MySql(_)
941                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
942            };
943
944            DataSourceDesc::OldSyntaxIngestion {
945                desc: source_desc,
946                progress_subsource: *id,
947                data_config: SourceExportDataConfig {
948                    encoding,
949                    envelope: envelope.clone(),
950                },
951                details,
952            }
953        }
954        None => DataSourceDesc::Ingestion(source_desc),
955    };
956
957    let if_not_exists = *if_not_exists;
958    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
959
960    // Check for an object in the catalog with this same name
961    let full_name = scx.catalog.resolve_full_name(&name);
962    let partial_name = PartialItemName::from(full_name.clone());
963    // For PostgreSQL compatibility, we need to prevent creating sources when
964    // there is an existing object *or* type of the same name.
965    if let (false, Ok(item)) = (
966        if_not_exists,
967        scx.catalog.resolve_item_or_type(&partial_name),
968    ) {
969        return Err(PlanError::ItemAlreadyExists {
970            name: full_name.to_string(),
971            item_type: item.item_type(),
972        });
973    }
974
975    // We will rewrite the cluster if one is not provided, so we must use the
976    // `in_cluster` value we plan to normalize when we canonicalize the create
977    // statement.
978    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
979
980    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
981
982    // Determine a default timeline for the source.
983    let timeline = match envelope {
984        SourceEnvelope::CdcV2 => {
985            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
986        }
987        _ => Timeline::EpochMilliseconds,
988    };
989
990    let compaction_window = plan_retain_history_option(scx, retain_history)?;
991
992    let source = Source {
993        create_sql,
994        data_source,
995        desc,
996        compaction_window,
997    };
998
999    Ok(Plan::CreateSource(CreateSourcePlan {
1000        name,
1001        source,
1002        if_not_exists,
1003        timeline,
1004        in_cluster: Some(in_cluster.id()),
1005    }))
1006}
1007
1008pub fn plan_generic_source_connection(
1009    scx: &StatementContext<'_>,
1010    source_connection: &CreateSourceConnection<Aug>,
1011    include_metadata: &Vec<SourceIncludeMetadata>,
1012) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1013    Ok(match source_connection {
1014        CreateSourceConnection::Kafka {
1015            connection,
1016            options,
1017        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1018            scx,
1019            connection,
1020            options,
1021            include_metadata,
1022        )?),
1023        CreateSourceConnection::Postgres {
1024            connection,
1025            options,
1026        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1027            scx, connection, options,
1028        )?),
1029        CreateSourceConnection::SqlServer {
1030            connection,
1031            options,
1032        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1033            scx, connection, options,
1034        )?),
1035        CreateSourceConnection::MySql {
1036            connection,
1037            options,
1038        } => {
1039            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1040        }
1041        CreateSourceConnection::LoadGenerator { generator, options } => {
1042            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1043                scx,
1044                generator,
1045                options,
1046                include_metadata,
1047            )?)
1048        }
1049    })
1050}
1051
1052fn plan_load_generator_source_connection(
1053    scx: &StatementContext<'_>,
1054    generator: &ast::LoadGenerator,
1055    options: &Vec<LoadGeneratorOption<Aug>>,
1056    include_metadata: &Vec<SourceIncludeMetadata>,
1057) -> Result<LoadGeneratorSourceConnection, PlanError> {
1058    let load_generator =
1059        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1060    let LoadGeneratorOptionExtracted {
1061        tick_interval,
1062        as_of,
1063        up_to,
1064        ..
1065    } = options.clone().try_into()?;
1066    let tick_micros = match tick_interval {
1067        Some(interval) => Some(interval.as_micros().try_into()?),
1068        None => None,
1069    };
1070    if up_to < as_of {
1071        sql_bail!("UP TO cannot be less than AS OF");
1072    }
1073    Ok(LoadGeneratorSourceConnection {
1074        load_generator,
1075        tick_micros,
1076        as_of,
1077        up_to,
1078    })
1079}
1080
1081fn plan_mysql_source_connection(
1082    scx: &StatementContext<'_>,
1083    connection: &ResolvedItemName,
1084    options: &Vec<MySqlConfigOption<Aug>>,
1085) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1086    let connection_item = scx.get_item_by_resolved_name(connection)?;
1087    match connection_item.connection()? {
1088        Connection::MySql(connection) => connection,
1089        _ => sql_bail!(
1090            "{} is not a MySQL connection",
1091            scx.catalog.resolve_full_name(connection_item.name())
1092        ),
1093    };
1094    let MySqlConfigOptionExtracted {
1095        details,
1096        // text/exclude columns are already part of the source-exports and are only included
1097        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1098        // be removed once we drop support for implicitly created subsources.
1099        text_columns: _,
1100        exclude_columns: _,
1101        seen: _,
1102    } = options.clone().try_into()?;
1103    let details = details
1104        .as_ref()
1105        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1106    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1107    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1108    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1109    Ok(MySqlSourceConnection {
1110        connection: connection_item.id(),
1111        connection_id: connection_item.id(),
1112        details,
1113    })
1114}
1115
1116fn plan_sqlserver_source_connection(
1117    scx: &StatementContext<'_>,
1118    connection: &ResolvedItemName,
1119    options: &Vec<SqlServerConfigOption<Aug>>,
1120) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1121    let connection_item = scx.get_item_by_resolved_name(connection)?;
1122    match connection_item.connection()? {
1123        Connection::SqlServer(connection) => connection,
1124        _ => sql_bail!(
1125            "{} is not a SQL Server connection",
1126            scx.catalog.resolve_full_name(connection_item.name())
1127        ),
1128    };
1129    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1130    let details = details
1131        .as_ref()
1132        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1133    let extras = hex::decode(details)
1134        .map_err(|e| sql_err!("{e}"))
1135        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1136        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1137    Ok(SqlServerSourceConnection {
1138        connection_id: connection_item.id(),
1139        connection: connection_item.id(),
1140        extras,
1141    })
1142}
1143
1144fn plan_postgres_source_connection(
1145    scx: &StatementContext<'_>,
1146    connection: &ResolvedItemName,
1147    options: &Vec<PgConfigOption<Aug>>,
1148) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1149    let connection_item = scx.get_item_by_resolved_name(connection)?;
1150    let PgConfigOptionExtracted {
1151        details,
1152        publication,
1153        // text columns are already part of the source-exports and are only included
1154        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1155        // be removed once we drop support for implicitly created subsources.
1156        text_columns: _,
1157        seen: _,
1158    } = options.clone().try_into()?;
1159    let details = details
1160        .as_ref()
1161        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1162    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1163    let details =
1164        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1165    let publication_details =
1166        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1167    Ok(PostgresSourceConnection {
1168        connection: connection_item.id(),
1169        connection_id: connection_item.id(),
1170        publication: publication.expect("validated exists during purification"),
1171        publication_details,
1172    })
1173}
1174
1175fn plan_kafka_source_connection(
1176    scx: &StatementContext<'_>,
1177    connection_name: &ResolvedItemName,
1178    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1179    include_metadata: &Vec<SourceIncludeMetadata>,
1180) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1181    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1182    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1183        sql_bail!(
1184            "{} is not a kafka connection",
1185            scx.catalog.resolve_full_name(connection_item.name())
1186        )
1187    }
1188    let KafkaSourceConfigOptionExtracted {
1189        group_id_prefix,
1190        topic,
1191        topic_metadata_refresh_interval,
1192        start_timestamp: _, // purified into `start_offset`
1193        start_offset,
1194        seen: _,
1195    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1196    let topic = topic.expect("validated exists during purification");
1197    let mut start_offsets = BTreeMap::new();
1198    if let Some(offsets) = start_offset {
1199        for (part, offset) in offsets.iter().enumerate() {
1200            if *offset < 0 {
1201                sql_bail!("START OFFSET must be a nonnegative integer");
1202            }
1203            start_offsets.insert(i32::try_from(part)?, *offset);
1204        }
1205    }
1206    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1207        // This is a librdkafka-enforced restriction that, if violated,
1208        // would result in a runtime error for the source.
1209        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1210    }
1211    let metadata_columns = include_metadata
1212        .into_iter()
1213        .flat_map(|item| match item {
1214            SourceIncludeMetadata::Timestamp { alias } => {
1215                let name = match alias {
1216                    Some(name) => name.to_string(),
1217                    None => "timestamp".to_owned(),
1218                };
1219                Some((name, KafkaMetadataKind::Timestamp))
1220            }
1221            SourceIncludeMetadata::Partition { alias } => {
1222                let name = match alias {
1223                    Some(name) => name.to_string(),
1224                    None => "partition".to_owned(),
1225                };
1226                Some((name, KafkaMetadataKind::Partition))
1227            }
1228            SourceIncludeMetadata::Offset { alias } => {
1229                let name = match alias {
1230                    Some(name) => name.to_string(),
1231                    None => "offset".to_owned(),
1232                };
1233                Some((name, KafkaMetadataKind::Offset))
1234            }
1235            SourceIncludeMetadata::Headers { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "headers".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Headers))
1241            }
1242            SourceIncludeMetadata::Header {
1243                alias,
1244                key,
1245                use_bytes,
1246            } => Some((
1247                alias.to_string(),
1248                KafkaMetadataKind::Header {
1249                    key: key.clone(),
1250                    use_bytes: *use_bytes,
1251                },
1252            )),
1253            SourceIncludeMetadata::Key { .. } => {
1254                // handled below
1255                None
1256            }
1257        })
1258        .collect();
1259    Ok(KafkaSourceConnection {
1260        connection: connection_item.id(),
1261        connection_id: connection_item.id(),
1262        topic,
1263        start_offsets,
1264        group_id_prefix,
1265        topic_metadata_refresh_interval,
1266        metadata_columns,
1267    })
1268}
1269
1270fn apply_source_envelope_encoding(
1271    scx: &StatementContext,
1272    envelope: &ast::SourceEnvelope,
1273    format: &Option<FormatSpecifier<Aug>>,
1274    key_desc: Option<RelationDesc>,
1275    value_desc: RelationDesc,
1276    include_metadata: &[SourceIncludeMetadata],
1277    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1278    source_connection: &GenericSourceConnection<ReferencedConnection>,
1279) -> Result<
1280    (
1281        RelationDesc,
1282        SourceEnvelope,
1283        Option<SourceDataEncoding<ReferencedConnection>>,
1284    ),
1285    PlanError,
1286> {
1287    let encoding = match format {
1288        Some(format) => Some(get_encoding(scx, format, envelope)?),
1289        None => None,
1290    };
1291
1292    let (key_desc, value_desc) = match &encoding {
1293        Some(encoding) => {
1294            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1295            // single column of type bytes.
1296            match value_desc.typ().columns() {
1297                [typ] => match typ.scalar_type {
1298                    SqlScalarType::Bytes => {}
1299                    _ => sql_bail!(
1300                        "The schema produced by the source is incompatible with format decoding"
1301                    ),
1302                },
1303                _ => sql_bail!(
1304                    "The schema produced by the source is incompatible with format decoding"
1305                ),
1306            }
1307
1308            let (key_desc, value_desc) = encoding.desc()?;
1309
1310            // TODO(petrosagg): This piece of code seems to be making a statement about the
1311            // nullability of the NONE envelope when the source is Kafka. As written, the code
1312            // misses opportunities to mark columns as not nullable and is over conservative. For
1313            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1314            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1315            // should be replaced with precise type-level reasoning.
1316            let key_desc = key_desc.map(|desc| {
1317                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1318                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1319                if is_kafka && is_envelope_none {
1320                    RelationDesc::from_names_and_types(
1321                        desc.into_iter()
1322                            .map(|(name, typ)| (name, typ.nullable(true))),
1323                    )
1324                } else {
1325                    desc
1326                }
1327            });
1328            (key_desc, value_desc)
1329        }
1330        None => (key_desc, value_desc),
1331    };
1332
1333    // KEY VALUE load generators are the only UPSERT source that
1334    // has no encoding but defaults to `INCLUDE KEY`.
1335    //
1336    // As discussed
1337    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1338    // removing this special case amounts to deciding how to handle null keys
1339    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1340    // this special case in.
1341    //
1342    // Note that this is safe because this generator is
1343    // 1. The only source with no encoding that can have its key included.
1344    // 2. Never produces null keys (or values, for that matter).
1345    let key_envelope_no_encoding = matches!(
1346        source_connection,
1347        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1348            load_generator: LoadGenerator::KeyValue(_),
1349            ..
1350        })
1351    );
1352    let mut key_envelope = get_key_envelope(
1353        include_metadata,
1354        encoding.as_ref(),
1355        key_envelope_no_encoding,
1356    )?;
1357
1358    match (&envelope, &key_envelope) {
1359        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1360        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1361            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1362        ),
1363        _ => {}
1364    };
1365
1366    // Not all source envelopes are compatible with all source connections.
1367    // Whoever constructs the source ingestion pipeline is responsible for
1368    // choosing compatible envelopes and connections.
1369    //
1370    // TODO(guswynn): ambiguously assert which connections and envelopes are
1371    // compatible in typechecking
1372    //
1373    // TODO: remove bails as more support for upsert is added.
1374    let envelope = match &envelope {
1375        // TODO: fixup key envelope
1376        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1377        ast::SourceEnvelope::Debezium => {
1378            //TODO check that key envelope is not set
1379            let after_idx = match typecheck_debezium(&value_desc) {
1380                Ok((_before_idx, after_idx)) => Ok(after_idx),
1381                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1382                    Some(DataEncoding::Avro(_)) => Err(type_err),
1383                    _ => Err(sql_err!(
1384                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1385                    )),
1386                },
1387            }?;
1388
1389            UnplannedSourceEnvelope::Upsert {
1390                style: UpsertStyle::Debezium { after_idx },
1391            }
1392        }
1393        ast::SourceEnvelope::Upsert {
1394            value_decode_err_policy,
1395        } => {
1396            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1397                None => {
1398                    if !key_envelope_no_encoding {
1399                        bail_unsupported!(format!(
1400                            "UPSERT requires a key/value format: {:?}",
1401                            format
1402                        ))
1403                    }
1404                    None
1405                }
1406                Some(key_encoding) => Some(key_encoding),
1407            };
1408            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1409            // specified.
1410            if key_envelope == KeyEnvelope::None {
1411                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1412            }
1413            // If the value decode error policy is not set we use the default upsert style.
1414            let style = match value_decode_err_policy.as_slice() {
1415                [] => UpsertStyle::Default(key_envelope),
1416                [SourceErrorPolicy::Inline { alias }] => {
1417                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1418                    UpsertStyle::ValueErrInline {
1419                        key_envelope,
1420                        error_column: alias
1421                            .as_ref()
1422                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1423                    }
1424                }
1425                _ => {
1426                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1427                }
1428            };
1429
1430            UnplannedSourceEnvelope::Upsert { style }
1431        }
1432        ast::SourceEnvelope::CdcV2 => {
1433            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1434            //TODO check that key envelope is not set
1435            match format {
1436                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1437                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1438            }
1439            UnplannedSourceEnvelope::CdcV2
1440        }
1441    };
1442
1443    let metadata_desc = included_column_desc(metadata_columns_desc);
1444    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1445
1446    Ok((desc, envelope, encoding))
1447}
1448
1449/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1450/// of columns and constraints.
1451fn plan_source_export_desc(
1452    scx: &StatementContext,
1453    name: &UnresolvedItemName,
1454    columns: &Vec<ColumnDef<Aug>>,
1455    constraints: &Vec<TableConstraint<Aug>>,
1456) -> Result<RelationDesc, PlanError> {
1457    let names: Vec<_> = columns
1458        .iter()
1459        .map(|c| normalize::column_name(c.name.clone()))
1460        .collect();
1461
1462    if let Some(dup) = names.iter().duplicates().next() {
1463        sql_bail!("column {} specified more than once", dup.quoted());
1464    }
1465
1466    // Build initial relation type that handles declared data types
1467    // and NOT NULL constraints.
1468    let mut column_types = Vec::with_capacity(columns.len());
1469    let mut keys = Vec::new();
1470
1471    for (i, c) in columns.into_iter().enumerate() {
1472        let aug_data_type = &c.data_type;
1473        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1474        let mut nullable = true;
1475        for option in &c.options {
1476            match &option.option {
1477                ColumnOption::NotNull => nullable = false,
1478                ColumnOption::Default(_) => {
1479                    bail_unsupported!("Source export with default value")
1480                }
1481                ColumnOption::Unique { is_primary } => {
1482                    keys.push(vec![i]);
1483                    if *is_primary {
1484                        nullable = false;
1485                    }
1486                }
1487                other => {
1488                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1489                }
1490            }
1491        }
1492        column_types.push(ty.nullable(nullable));
1493    }
1494
1495    let mut seen_primary = false;
1496    'c: for constraint in constraints {
1497        match constraint {
1498            TableConstraint::Unique {
1499                name: _,
1500                columns,
1501                is_primary,
1502                nulls_not_distinct,
1503            } => {
1504                if seen_primary && *is_primary {
1505                    sql_bail!(
1506                        "multiple primary keys for source export {} are not allowed",
1507                        name.to_ast_string_stable()
1508                    );
1509                }
1510                seen_primary = *is_primary || seen_primary;
1511
1512                let mut key = vec![];
1513                for column in columns {
1514                    let column = normalize::column_name(column.clone());
1515                    match names.iter().position(|name| *name == column) {
1516                        None => sql_bail!("unknown column in constraint: {}", column),
1517                        Some(i) => {
1518                            let nullable = &mut column_types[i].nullable;
1519                            if *is_primary {
1520                                if *nulls_not_distinct {
1521                                    sql_bail!(
1522                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1523                                    );
1524                                }
1525                                *nullable = false;
1526                            } else if !(*nulls_not_distinct || !*nullable) {
1527                                // Non-primary key unique constraints are only keys if all of their
1528                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1529                                break 'c;
1530                            }
1531
1532                            key.push(i);
1533                        }
1534                    }
1535                }
1536
1537                if *is_primary {
1538                    keys.insert(0, key);
1539                } else {
1540                    keys.push(key);
1541                }
1542            }
1543            TableConstraint::ForeignKey { .. } => {
1544                bail_unsupported!("Source export with a foreign key")
1545            }
1546            TableConstraint::Check { .. } => {
1547                bail_unsupported!("Source export with a check constraint")
1548            }
1549        }
1550    }
1551
1552    let typ = SqlRelationType::new(column_types).with_keys(keys);
1553    let desc = RelationDesc::new(typ, names);
1554    Ok(desc)
1555}
1556
1557generate_extracted_config!(
1558    CreateSubsourceOption,
1559    (Progress, bool, Default(false)),
1560    (ExternalReference, UnresolvedItemName),
1561    (RetainHistory, OptionalDuration),
1562    (TextColumns, Vec::<Ident>, Default(vec![])),
1563    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1564    (Details, String)
1565);
1566
1567pub fn plan_create_subsource(
1568    scx: &StatementContext,
1569    stmt: CreateSubsourceStatement<Aug>,
1570) -> Result<Plan, PlanError> {
1571    let CreateSubsourceStatement {
1572        name,
1573        columns,
1574        of_source,
1575        constraints,
1576        if_not_exists,
1577        with_options,
1578    } = &stmt;
1579
1580    let CreateSubsourceOptionExtracted {
1581        progress,
1582        retain_history,
1583        external_reference,
1584        text_columns,
1585        exclude_columns,
1586        details,
1587        seen: _,
1588    } = with_options.clone().try_into()?;
1589
1590    // This invariant is enforced during purification; we are responsible for
1591    // creating the AST for subsources as a response to CREATE SOURCE
1592    // statements, so this would fire in integration testing if we failed to
1593    // uphold it.
1594    assert!(
1595        progress ^ (external_reference.is_some() && of_source.is_some()),
1596        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1597    );
1598
1599    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1600
1601    let data_source = if let Some(source_reference) = of_source {
1602        // If the new source table syntax is forced we should not be creating any non-progress
1603        // subsources.
1604        if scx.catalog.system_vars().enable_create_table_from_source()
1605            && scx.catalog.system_vars().force_source_table_syntax()
1606        {
1607            Err(PlanError::UseTablesForSources(
1608                "CREATE SUBSOURCE".to_string(),
1609            ))?;
1610        }
1611
1612        // This is a subsource with the "natural" dependency order, i.e. it is
1613        // not a legacy subsource with the inverted structure.
1614        let ingestion_id = *source_reference.item_id();
1615        let external_reference = external_reference.unwrap();
1616
1617        // Decode the details option stored on the subsource statement, which contains information
1618        // created during the purification process.
1619        let details = details
1620            .as_ref()
1621            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1622        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1623        let details =
1624            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1625        let details =
1626            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1627        let details = match details {
1628            SourceExportStatementDetails::Postgres { table } => {
1629                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1630                    column_casts: crate::pure::postgres::generate_column_casts(
1631                        scx,
1632                        &table,
1633                        &text_columns,
1634                    )?,
1635                    table,
1636                })
1637            }
1638            SourceExportStatementDetails::MySql {
1639                table,
1640                initial_gtid_set,
1641            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1642                table,
1643                initial_gtid_set,
1644                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1645                exclude_columns: exclude_columns
1646                    .into_iter()
1647                    .map(|c| c.into_string())
1648                    .collect(),
1649            }),
1650            SourceExportStatementDetails::SqlServer {
1651                table,
1652                capture_instance,
1653                initial_lsn,
1654            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1655                capture_instance,
1656                table,
1657                initial_lsn,
1658                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659                exclude_columns: exclude_columns
1660                    .into_iter()
1661                    .map(|c| c.into_string())
1662                    .collect(),
1663            }),
1664            SourceExportStatementDetails::LoadGenerator { output } => {
1665                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1666            }
1667            SourceExportStatementDetails::Kafka {} => {
1668                bail_unsupported!("subsources cannot reference Kafka sources")
1669            }
1670        };
1671        DataSourceDesc::IngestionExport {
1672            ingestion_id,
1673            external_reference,
1674            details,
1675            // Subsources don't currently support non-default envelopes / encoding
1676            data_config: SourceExportDataConfig {
1677                envelope: SourceEnvelope::None(NoneEnvelope {
1678                    key_envelope: KeyEnvelope::None,
1679                    key_arity: 0,
1680                }),
1681                encoding: None,
1682            },
1683        }
1684    } else if progress {
1685        DataSourceDesc::Progress
1686    } else {
1687        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1688    };
1689
1690    let if_not_exists = *if_not_exists;
1691    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1692
1693    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1694
1695    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1696    let source = Source {
1697        create_sql,
1698        data_source,
1699        desc,
1700        compaction_window,
1701    };
1702
1703    Ok(Plan::CreateSource(CreateSourcePlan {
1704        name,
1705        source,
1706        if_not_exists,
1707        timeline: Timeline::EpochMilliseconds,
1708        in_cluster: None,
1709    }))
1710}
1711
1712generate_extracted_config!(
1713    TableFromSourceOption,
1714    (TextColumns, Vec::<Ident>, Default(vec![])),
1715    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1716    (PartitionBy, Vec<Ident>),
1717    (RetainHistory, OptionalDuration),
1718    (Details, String)
1719);
1720
1721pub fn plan_create_table_from_source(
1722    scx: &StatementContext,
1723    stmt: CreateTableFromSourceStatement<Aug>,
1724) -> Result<Plan, PlanError> {
1725    if !scx.catalog.system_vars().enable_create_table_from_source() {
1726        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1727    }
1728
1729    let CreateTableFromSourceStatement {
1730        name,
1731        columns,
1732        constraints,
1733        if_not_exists,
1734        source,
1735        external_reference,
1736        envelope,
1737        format,
1738        include_metadata,
1739        with_options,
1740    } = &stmt;
1741
1742    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1743
1744    let TableFromSourceOptionExtracted {
1745        text_columns,
1746        exclude_columns,
1747        retain_history,
1748        partition_by,
1749        details,
1750        seen: _,
1751    } = with_options.clone().try_into()?;
1752
1753    let source_item = scx.get_item_by_resolved_name(source)?;
1754    let ingestion_id = source_item.id();
1755
1756    // Decode the details option stored on the statement, which contains information
1757    // created during the purification process.
1758    let details = details
1759        .as_ref()
1760        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1761    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1762    let details =
1763        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1764    let details =
1765        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1766
1767    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1768        && include_metadata
1769            .iter()
1770            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1771    {
1772        // TODO(guswynn): should this be `bail_unsupported!`?
1773        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1774    }
1775    if !matches!(
1776        details,
1777        SourceExportStatementDetails::Kafka { .. }
1778            | SourceExportStatementDetails::LoadGenerator { .. }
1779    ) && !include_metadata.is_empty()
1780    {
1781        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1782    }
1783
1784    let details = match details {
1785        SourceExportStatementDetails::Postgres { table } => {
1786            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1787                column_casts: crate::pure::postgres::generate_column_casts(
1788                    scx,
1789                    &table,
1790                    &text_columns,
1791                )?,
1792                table,
1793            })
1794        }
1795        SourceExportStatementDetails::MySql {
1796            table,
1797            initial_gtid_set,
1798        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1799            table,
1800            initial_gtid_set,
1801            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1802            exclude_columns: exclude_columns
1803                .into_iter()
1804                .map(|c| c.into_string())
1805                .collect(),
1806        }),
1807        SourceExportStatementDetails::SqlServer {
1808            table,
1809            capture_instance,
1810            initial_lsn,
1811        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1812            table,
1813            capture_instance,
1814            initial_lsn,
1815            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816            exclude_columns: exclude_columns
1817                .into_iter()
1818                .map(|c| c.into_string())
1819                .collect(),
1820        }),
1821        SourceExportStatementDetails::LoadGenerator { output } => {
1822            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1823        }
1824        SourceExportStatementDetails::Kafka {} => {
1825            if !include_metadata.is_empty()
1826                && !matches!(
1827                    envelope,
1828                    ast::SourceEnvelope::Upsert { .. }
1829                        | ast::SourceEnvelope::None
1830                        | ast::SourceEnvelope::Debezium
1831                )
1832            {
1833                // TODO(guswynn): should this be `bail_unsupported!`?
1834                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1835            }
1836
1837            let metadata_columns = include_metadata
1838                .into_iter()
1839                .flat_map(|item| match item {
1840                    SourceIncludeMetadata::Timestamp { alias } => {
1841                        let name = match alias {
1842                            Some(name) => name.to_string(),
1843                            None => "timestamp".to_owned(),
1844                        };
1845                        Some((name, KafkaMetadataKind::Timestamp))
1846                    }
1847                    SourceIncludeMetadata::Partition { alias } => {
1848                        let name = match alias {
1849                            Some(name) => name.to_string(),
1850                            None => "partition".to_owned(),
1851                        };
1852                        Some((name, KafkaMetadataKind::Partition))
1853                    }
1854                    SourceIncludeMetadata::Offset { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "offset".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Offset))
1860                    }
1861                    SourceIncludeMetadata::Headers { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "headers".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Headers))
1867                    }
1868                    SourceIncludeMetadata::Header {
1869                        alias,
1870                        key,
1871                        use_bytes,
1872                    } => Some((
1873                        alias.to_string(),
1874                        KafkaMetadataKind::Header {
1875                            key: key.clone(),
1876                            use_bytes: *use_bytes,
1877                        },
1878                    )),
1879                    SourceIncludeMetadata::Key { .. } => {
1880                        // handled below
1881                        None
1882                    }
1883                })
1884                .collect();
1885
1886            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1887        }
1888    };
1889
1890    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1891
1892    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1893    // during purification and define the `columns` and `constraints` fields for the statement,
1894    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1895    // we use the source connection's default schema.
1896    let (key_desc, value_desc) =
1897        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1898            let columns = match columns {
1899                TableFromSourceColumns::Defined(columns) => columns,
1900                _ => unreachable!(),
1901            };
1902            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1903            (None, desc)
1904        } else {
1905            let key_desc = source_connection.default_key_desc();
1906            let value_desc = source_connection.default_value_desc();
1907            (Some(key_desc), value_desc)
1908        };
1909
1910    let metadata_columns_desc = match &details {
1911        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1912            metadata_columns, ..
1913        }) => kafka_metadata_columns_desc(metadata_columns),
1914        _ => vec![],
1915    };
1916
1917    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1918        scx,
1919        &envelope,
1920        format,
1921        key_desc,
1922        value_desc,
1923        include_metadata,
1924        metadata_columns_desc,
1925        source_connection,
1926    )?;
1927    if let TableFromSourceColumns::Named(col_names) = columns {
1928        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1929    }
1930
1931    let names: Vec<_> = desc.iter_names().cloned().collect();
1932    if let Some(dup) = names.iter().duplicates().next() {
1933        sql_bail!("column {} specified more than once", dup.quoted());
1934    }
1935
1936    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1937
1938    // Allow users to specify a timeline. If they do not, determine a default
1939    // timeline for the source.
1940    let timeline = match envelope {
1941        SourceEnvelope::CdcV2 => {
1942            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1943        }
1944        _ => Timeline::EpochMilliseconds,
1945    };
1946
1947    if let Some(partition_by) = partition_by {
1948        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1949        check_partition_by(&desc, partition_by)?;
1950    }
1951
1952    let data_source = DataSourceDesc::IngestionExport {
1953        ingestion_id,
1954        external_reference: external_reference
1955            .as_ref()
1956            .expect("populated in purification")
1957            .clone(),
1958        details,
1959        data_config: SourceExportDataConfig { envelope, encoding },
1960    };
1961
1962    let if_not_exists = *if_not_exists;
1963
1964    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1965
1966    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1967    let table = Table {
1968        create_sql,
1969        desc: VersionedRelationDesc::new(desc),
1970        temporary: false,
1971        compaction_window,
1972        data_source: TableDataSource::DataSource {
1973            desc: data_source,
1974            timeline,
1975        },
1976    };
1977
1978    Ok(Plan::CreateTable(CreateTablePlan {
1979        name,
1980        table,
1981        if_not_exists,
1982    }))
1983}
1984
1985generate_extracted_config!(
1986    LoadGeneratorOption,
1987    (TickInterval, Duration),
1988    (AsOf, u64, Default(0_u64)),
1989    (UpTo, u64, Default(u64::MAX)),
1990    (ScaleFactor, f64),
1991    (MaxCardinality, u64),
1992    (Keys, u64),
1993    (SnapshotRounds, u64),
1994    (TransactionalSnapshot, bool),
1995    (ValueSize, u64),
1996    (Seed, u64),
1997    (Partitions, u64),
1998    (BatchSize, u64)
1999);
2000
2001impl LoadGeneratorOptionExtracted {
2002    pub(super) fn ensure_only_valid_options(
2003        &self,
2004        loadgen: &ast::LoadGenerator,
2005    ) -> Result<(), PlanError> {
2006        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2007
2008        let mut options = self.seen.clone();
2009
2010        let permitted_options: &[_] = match loadgen {
2011            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2012            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2013            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2014            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2015            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2016            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2017            ast::LoadGenerator::KeyValue => &[
2018                TickInterval,
2019                Keys,
2020                SnapshotRounds,
2021                TransactionalSnapshot,
2022                ValueSize,
2023                Seed,
2024                Partitions,
2025                BatchSize,
2026            ],
2027        };
2028
2029        for o in permitted_options {
2030            options.remove(o);
2031        }
2032
2033        if !options.is_empty() {
2034            sql_bail!(
2035                "{} load generators do not support {} values",
2036                loadgen,
2037                options.iter().join(", ")
2038            )
2039        }
2040
2041        Ok(())
2042    }
2043}
2044
2045pub(crate) fn load_generator_ast_to_generator(
2046    scx: &StatementContext,
2047    loadgen: &ast::LoadGenerator,
2048    options: &[LoadGeneratorOption<Aug>],
2049    include_metadata: &[SourceIncludeMetadata],
2050) -> Result<LoadGenerator, PlanError> {
2051    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2052    extracted.ensure_only_valid_options(loadgen)?;
2053
2054    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2055        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2056    }
2057
2058    let load_generator = match loadgen {
2059        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2060        ast::LoadGenerator::Clock => {
2061            scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2062            LoadGenerator::Clock
2063        }
2064        ast::LoadGenerator::Counter => {
2065            let LoadGeneratorOptionExtracted {
2066                max_cardinality, ..
2067            } = extracted;
2068            LoadGenerator::Counter { max_cardinality }
2069        }
2070        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2071        ast::LoadGenerator::Datums => LoadGenerator::Datums,
2072        ast::LoadGenerator::Tpch => {
2073            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2074
2075            // Default to 0.01 scale factor (=10MB).
2076            let sf: f64 = scale_factor.unwrap_or(0.01);
2077            if !sf.is_finite() || sf < 0.0 {
2078                sql_bail!("unsupported scale factor {sf}");
2079            }
2080
2081            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2082                let total = (sf * multiplier).floor();
2083                let mut i = i64::try_cast_from(total)
2084                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2085                if i < 1 {
2086                    i = 1;
2087                }
2088                Ok(i)
2089            };
2090
2091            // The multiplications here are safely unchecked because they will
2092            // overflow to infinity, which will be caught by f64_to_i64.
2093            let count_supplier = f_to_i(10_000f64)?;
2094            let count_part = f_to_i(200_000f64)?;
2095            let count_customer = f_to_i(150_000f64)?;
2096            let count_orders = f_to_i(150_000f64 * 10f64)?;
2097            let count_clerk = f_to_i(1_000f64)?;
2098
2099            LoadGenerator::Tpch {
2100                count_supplier,
2101                count_part,
2102                count_customer,
2103                count_orders,
2104                count_clerk,
2105            }
2106        }
2107        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2108            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2109            let LoadGeneratorOptionExtracted {
2110                keys,
2111                snapshot_rounds,
2112                transactional_snapshot,
2113                value_size,
2114                tick_interval,
2115                seed,
2116                partitions,
2117                batch_size,
2118                ..
2119            } = extracted;
2120
2121            let mut include_offset = None;
2122            for im in include_metadata {
2123                match im {
2124                    SourceIncludeMetadata::Offset { alias } => {
2125                        include_offset = match alias {
2126                            Some(alias) => Some(alias.to_string()),
2127                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2128                        }
2129                    }
2130                    SourceIncludeMetadata::Key { .. } => continue,
2131
2132                    _ => {
2133                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2134                    }
2135                };
2136            }
2137
2138            let lgkv = KeyValueLoadGenerator {
2139                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2140                snapshot_rounds: snapshot_rounds
2141                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2142                // Defaults to true.
2143                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2144                value_size: value_size
2145                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2146                partitions: partitions
2147                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2148                tick_interval,
2149                batch_size: batch_size
2150                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2151                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2152                include_offset,
2153            };
2154
2155            if lgkv.keys == 0
2156                || lgkv.partitions == 0
2157                || lgkv.value_size == 0
2158                || lgkv.batch_size == 0
2159            {
2160                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2161            }
2162
2163            if lgkv.keys % lgkv.partitions != 0 {
2164                sql_bail!("KEYS must be a multiple of PARTITIONS")
2165            }
2166
2167            if lgkv.batch_size > lgkv.keys {
2168                sql_bail!("KEYS must be larger than BATCH SIZE")
2169            }
2170
2171            // This constraints simplifies the source implementation.
2172            // We can lift it later.
2173            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2174                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2175            }
2176
2177            if lgkv.snapshot_rounds == 0 {
2178                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2179            }
2180
2181            LoadGenerator::KeyValue(lgkv)
2182        }
2183    };
2184
2185    Ok(load_generator)
2186}
2187
2188fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2189    let before = value_desc.get_by_name(&"before".into());
2190    let (after_idx, after_ty) = value_desc
2191        .get_by_name(&"after".into())
2192        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2193    let before_idx = if let Some((before_idx, before_ty)) = before {
2194        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2195            sql_bail!("'before' column must be of type record");
2196        }
2197        if before_ty != after_ty {
2198            sql_bail!("'before' type differs from 'after' column");
2199        }
2200        Some(before_idx)
2201    } else {
2202        None
2203    };
2204    Ok((before_idx, after_idx))
2205}
2206
2207fn get_encoding(
2208    scx: &StatementContext,
2209    format: &FormatSpecifier<Aug>,
2210    envelope: &ast::SourceEnvelope,
2211) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2212    let encoding = match format {
2213        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2214        FormatSpecifier::KeyValue { key, value } => {
2215            let key = {
2216                let encoding = get_encoding_inner(scx, key)?;
2217                Some(encoding.key.unwrap_or(encoding.value))
2218            };
2219            let value = get_encoding_inner(scx, value)?.value;
2220            SourceDataEncoding { key, value }
2221        }
2222    };
2223
2224    let requires_keyvalue = matches!(
2225        envelope,
2226        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2227    );
2228    let is_keyvalue = encoding.key.is_some();
2229    if requires_keyvalue && !is_keyvalue {
2230        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2231    };
2232
2233    Ok(encoding)
2234}
2235
2236/// Determine the cluster ID to use for this item.
2237///
2238/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2239/// Because of this, do not normalize/canonicalize the create SQL statement
2240/// until after calling this function.
2241fn source_sink_cluster_config<'a, 'ctx>(
2242    scx: &'a StatementContext<'ctx>,
2243    in_cluster: &mut Option<ResolvedClusterName>,
2244) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2245    let cluster = match in_cluster {
2246        None => {
2247            let cluster = scx.catalog.resolve_cluster(None)?;
2248            *in_cluster = Some(ResolvedClusterName {
2249                id: cluster.id(),
2250                print_name: None,
2251            });
2252            cluster
2253        }
2254        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2255    };
2256
2257    Ok(cluster)
2258}
2259
2260generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2261
2262#[derive(Debug)]
2263pub struct Schema {
2264    pub key_schema: Option<String>,
2265    pub value_schema: String,
2266    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2267    pub confluent_wire_format: bool,
2268}
2269
2270fn get_encoding_inner(
2271    scx: &StatementContext,
2272    format: &Format<Aug>,
2273) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2274    let value = match format {
2275        Format::Bytes => DataEncoding::Bytes,
2276        Format::Avro(schema) => {
2277            let Schema {
2278                key_schema,
2279                value_schema,
2280                csr_connection,
2281                confluent_wire_format,
2282            } = match schema {
2283                // TODO(jldlaughlin): we need a way to pass in primary key information
2284                // when building a source from a string or file.
2285                AvroSchema::InlineSchema {
2286                    schema: ast::Schema { schema },
2287                    with_options,
2288                } => {
2289                    let AvroSchemaOptionExtracted {
2290                        confluent_wire_format,
2291                        ..
2292                    } = with_options.clone().try_into()?;
2293
2294                    Schema {
2295                        key_schema: None,
2296                        value_schema: schema.clone(),
2297                        csr_connection: None,
2298                        confluent_wire_format,
2299                    }
2300                }
2301                AvroSchema::Csr {
2302                    csr_connection:
2303                        CsrConnectionAvro {
2304                            connection,
2305                            seed,
2306                            key_strategy: _,
2307                            value_strategy: _,
2308                        },
2309                } => {
2310                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2311                    let csr_connection = match item.connection()? {
2312                        Connection::Csr(_) => item.id(),
2313                        _ => {
2314                            sql_bail!(
2315                                "{} is not a schema registry connection",
2316                                scx.catalog
2317                                    .resolve_full_name(item.name())
2318                                    .to_string()
2319                                    .quoted()
2320                            )
2321                        }
2322                    };
2323
2324                    if let Some(seed) = seed {
2325                        Schema {
2326                            key_schema: seed.key_schema.clone(),
2327                            value_schema: seed.value_schema.clone(),
2328                            csr_connection: Some(csr_connection),
2329                            confluent_wire_format: true,
2330                        }
2331                    } else {
2332                        unreachable!("CSR seed resolution should already have been called: Avro")
2333                    }
2334                }
2335            };
2336
2337            if let Some(key_schema) = key_schema {
2338                return Ok(SourceDataEncoding {
2339                    key: Some(DataEncoding::Avro(AvroEncoding {
2340                        schema: key_schema,
2341                        csr_connection: csr_connection.clone(),
2342                        confluent_wire_format,
2343                    })),
2344                    value: DataEncoding::Avro(AvroEncoding {
2345                        schema: value_schema,
2346                        csr_connection,
2347                        confluent_wire_format,
2348                    }),
2349                });
2350            } else {
2351                DataEncoding::Avro(AvroEncoding {
2352                    schema: value_schema,
2353                    csr_connection,
2354                    confluent_wire_format,
2355                })
2356            }
2357        }
2358        Format::Protobuf(schema) => match schema {
2359            ProtobufSchema::Csr {
2360                csr_connection:
2361                    CsrConnectionProtobuf {
2362                        connection:
2363                            CsrConnection {
2364                                connection,
2365                                options,
2366                            },
2367                        seed,
2368                    },
2369            } => {
2370                if let Some(CsrSeedProtobuf { key, value }) = seed {
2371                    let item = scx.get_item_by_resolved_name(connection)?;
2372                    let _ = match item.connection()? {
2373                        Connection::Csr(connection) => connection,
2374                        _ => {
2375                            sql_bail!(
2376                                "{} is not a schema registry connection",
2377                                scx.catalog
2378                                    .resolve_full_name(item.name())
2379                                    .to_string()
2380                                    .quoted()
2381                            )
2382                        }
2383                    };
2384
2385                    if !options.is_empty() {
2386                        sql_bail!("Protobuf CSR connections do not support any options");
2387                    }
2388
2389                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2390                        descriptors: strconv::parse_bytes(&value.schema)?,
2391                        message_name: value.message_name.clone(),
2392                        confluent_wire_format: true,
2393                    });
2394                    if let Some(key) = key {
2395                        return Ok(SourceDataEncoding {
2396                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2397                                descriptors: strconv::parse_bytes(&key.schema)?,
2398                                message_name: key.message_name.clone(),
2399                                confluent_wire_format: true,
2400                            })),
2401                            value,
2402                        });
2403                    }
2404                    value
2405                } else {
2406                    unreachable!("CSR seed resolution should already have been called: Proto")
2407                }
2408            }
2409            ProtobufSchema::InlineSchema {
2410                message_name,
2411                schema: ast::Schema { schema },
2412            } => {
2413                let descriptors = strconv::parse_bytes(schema)?;
2414
2415                DataEncoding::Protobuf(ProtobufEncoding {
2416                    descriptors,
2417                    message_name: message_name.to_owned(),
2418                    confluent_wire_format: false,
2419                })
2420            }
2421        },
2422        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2423            regex: mz_repr::adt::regex::Regex::new(regex, false)
2424                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2425        }),
2426        Format::Csv { columns, delimiter } => {
2427            let columns = match columns {
2428                CsvColumns::Header { names } => {
2429                    if names.is_empty() {
2430                        sql_bail!("[internal error] column spec should get names in purify")
2431                    }
2432                    ColumnSpec::Header {
2433                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2434                    }
2435                }
2436                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2437            };
2438            DataEncoding::Csv(CsvEncoding {
2439                columns,
2440                delimiter: u8::try_from(*delimiter)
2441                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2442            })
2443        }
2444        Format::Json { array: false } => DataEncoding::Json,
2445        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2446        Format::Text => DataEncoding::Text,
2447    };
2448    Ok(SourceDataEncoding { key: None, value })
2449}
2450
2451/// Extract the key envelope, if it is requested
2452fn get_key_envelope(
2453    included_items: &[SourceIncludeMetadata],
2454    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2455    key_envelope_no_encoding: bool,
2456) -> Result<KeyEnvelope, PlanError> {
2457    let key_definition = included_items
2458        .iter()
2459        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2460    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2461        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2462            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2463            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2464            (Some(name), _) if key_envelope_no_encoding => {
2465                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2466            }
2467            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2468            (_, None) => {
2469                // `kd.alias` == `None` means `INCLUDE KEY`
2470                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2471                // These both make sense with the same error message
2472                sql_bail!(
2473                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2474                        got bare FORMAT"
2475                );
2476            }
2477        }
2478    } else {
2479        Ok(KeyEnvelope::None)
2480    }
2481}
2482
2483/// Gets the key envelope for a given key encoding when no name for the key has
2484/// been requested by the user.
2485fn get_unnamed_key_envelope(
2486    key: Option<&DataEncoding<ReferencedConnection>>,
2487) -> Result<KeyEnvelope, PlanError> {
2488    // If the key is requested but comes from an unnamed type then it gets the name "key"
2489    //
2490    // Otherwise it gets the names of the columns in the type
2491    let is_composite = match key {
2492        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2493        Some(
2494            DataEncoding::Avro(_)
2495            | DataEncoding::Csv(_)
2496            | DataEncoding::Protobuf(_)
2497            | DataEncoding::Regex { .. },
2498        ) => true,
2499        None => false,
2500    };
2501
2502    if is_composite {
2503        Ok(KeyEnvelope::Flattened)
2504    } else {
2505        Ok(KeyEnvelope::Named("key".to_string()))
2506    }
2507}
2508
2509pub fn describe_create_view(
2510    _: &StatementContext,
2511    _: CreateViewStatement<Aug>,
2512) -> Result<StatementDesc, PlanError> {
2513    Ok(StatementDesc::new(None))
2514}
2515
2516pub fn plan_view(
2517    scx: &StatementContext,
2518    def: &mut ViewDefinition<Aug>,
2519    temporary: bool,
2520) -> Result<(QualifiedItemName, View), PlanError> {
2521    let create_sql = normalize::create_statement(
2522        scx,
2523        Statement::CreateView(CreateViewStatement {
2524            if_exists: IfExistsBehavior::Error,
2525            temporary,
2526            definition: def.clone(),
2527        }),
2528    )?;
2529
2530    let ViewDefinition {
2531        name,
2532        columns,
2533        query,
2534    } = def;
2535
2536    let query::PlannedRootQuery {
2537        expr,
2538        mut desc,
2539        finishing,
2540        scope: _,
2541    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2542    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2543    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2544    // here to help with database-issues#236. However, in the meantime, there might be a better
2545    // approach to solve database-issues#236:
2546    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2547    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2548        &finishing,
2549        expr.arity()
2550    ));
2551    if expr.contains_parameters()? {
2552        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2553    }
2554
2555    let dependencies = expr
2556        .depends_on()
2557        .into_iter()
2558        .map(|gid| scx.catalog.resolve_item_id(&gid))
2559        .collect();
2560
2561    let name = if temporary {
2562        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2563    } else {
2564        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2565    };
2566
2567    plan_utils::maybe_rename_columns(
2568        format!("view {}", scx.catalog.resolve_full_name(&name)),
2569        &mut desc,
2570        columns,
2571    )?;
2572    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2573
2574    if let Some(dup) = names.iter().duplicates().next() {
2575        sql_bail!("column {} specified more than once", dup.quoted());
2576    }
2577
2578    let view = View {
2579        create_sql,
2580        expr,
2581        dependencies,
2582        column_names: names,
2583        temporary,
2584    };
2585
2586    Ok((name, view))
2587}
2588
2589pub fn plan_create_view(
2590    scx: &StatementContext,
2591    mut stmt: CreateViewStatement<Aug>,
2592) -> Result<Plan, PlanError> {
2593    let CreateViewStatement {
2594        temporary,
2595        if_exists,
2596        definition,
2597    } = &mut stmt;
2598    let (name, view) = plan_view(scx, definition, *temporary)?;
2599
2600    // Override the statement-level IfExistsBehavior with Skip if this is
2601    // explicitly requested in the PlanContext (the default is `false`).
2602    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2603
2604    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2605        let if_exists = true;
2606        let cascade = false;
2607        let maybe_item_to_drop = plan_drop_item(
2608            scx,
2609            ObjectType::View,
2610            if_exists,
2611            definition.name.clone(),
2612            cascade,
2613        )?;
2614
2615        // Check if the new View depends on the item that we would be replacing.
2616        if let Some(id) = maybe_item_to_drop {
2617            let dependencies = view.expr.depends_on();
2618            let invalid_drop = scx
2619                .get_item(&id)
2620                .global_ids()
2621                .any(|gid| dependencies.contains(&gid));
2622            if invalid_drop {
2623                let item = scx.catalog.get_item(&id);
2624                sql_bail!(
2625                    "cannot replace view {0}: depended upon by new {0} definition",
2626                    scx.catalog.resolve_full_name(item.name())
2627                );
2628            }
2629
2630            Some(id)
2631        } else {
2632            None
2633        }
2634    } else {
2635        None
2636    };
2637    let drop_ids = replace
2638        .map(|id| {
2639            scx.catalog
2640                .item_dependents(id)
2641                .into_iter()
2642                .map(|id| id.unwrap_item_id())
2643                .collect()
2644        })
2645        .unwrap_or_default();
2646
2647    // Check for an object in the catalog with this same name
2648    let full_name = scx.catalog.resolve_full_name(&name);
2649    let partial_name = PartialItemName::from(full_name.clone());
2650    // For PostgreSQL compatibility, we need to prevent creating views when
2651    // there is an existing object *or* type of the same name.
2652    if let (Ok(item), IfExistsBehavior::Error, false) = (
2653        scx.catalog.resolve_item_or_type(&partial_name),
2654        *if_exists,
2655        ignore_if_exists_errors,
2656    ) {
2657        return Err(PlanError::ItemAlreadyExists {
2658            name: full_name.to_string(),
2659            item_type: item.item_type(),
2660        });
2661    }
2662
2663    Ok(Plan::CreateView(CreateViewPlan {
2664        name,
2665        view,
2666        replace,
2667        drop_ids,
2668        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2669        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2670    }))
2671}
2672
2673pub fn describe_create_materialized_view(
2674    _: &StatementContext,
2675    _: CreateMaterializedViewStatement<Aug>,
2676) -> Result<StatementDesc, PlanError> {
2677    Ok(StatementDesc::new(None))
2678}
2679
2680pub fn describe_create_continual_task(
2681    _: &StatementContext,
2682    _: CreateContinualTaskStatement<Aug>,
2683) -> Result<StatementDesc, PlanError> {
2684    Ok(StatementDesc::new(None))
2685}
2686
2687pub fn describe_create_network_policy(
2688    _: &StatementContext,
2689    _: CreateNetworkPolicyStatement<Aug>,
2690) -> Result<StatementDesc, PlanError> {
2691    Ok(StatementDesc::new(None))
2692}
2693
2694pub fn describe_alter_network_policy(
2695    _: &StatementContext,
2696    _: AlterNetworkPolicyStatement<Aug>,
2697) -> Result<StatementDesc, PlanError> {
2698    Ok(StatementDesc::new(None))
2699}
2700
2701pub fn plan_create_materialized_view(
2702    scx: &StatementContext,
2703    mut stmt: CreateMaterializedViewStatement<Aug>,
2704) -> Result<Plan, PlanError> {
2705    let cluster_id =
2706        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2707    stmt.in_cluster = Some(ResolvedClusterName {
2708        id: cluster_id,
2709        print_name: None,
2710    });
2711
2712    let create_sql =
2713        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2714
2715    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2716    let name = scx.allocate_qualified_name(partial_name.clone())?;
2717
2718    let query::PlannedRootQuery {
2719        expr,
2720        mut desc,
2721        finishing,
2722        scope: _,
2723    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2724    // We get back a trivial finishing, see comment in `plan_view`.
2725    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2726        &finishing,
2727        expr.arity()
2728    ));
2729    if expr.contains_parameters()? {
2730        return Err(PlanError::ParameterNotAllowed(
2731            "materialized views".to_string(),
2732        ));
2733    }
2734
2735    plan_utils::maybe_rename_columns(
2736        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2737        &mut desc,
2738        &stmt.columns,
2739    )?;
2740    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2741
2742    let MaterializedViewOptionExtracted {
2743        assert_not_null,
2744        partition_by,
2745        retain_history,
2746        refresh,
2747        seen: _,
2748    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2749
2750    if let Some(partition_by) = partition_by {
2751        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2752        check_partition_by(&desc, partition_by)?;
2753    }
2754
2755    let refresh_schedule = {
2756        let mut refresh_schedule = RefreshSchedule::default();
2757        let mut on_commits_seen = 0;
2758        for refresh_option_value in refresh {
2759            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2760                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2761            }
2762            match refresh_option_value {
2763                RefreshOptionValue::OnCommit => {
2764                    on_commits_seen += 1;
2765                }
2766                RefreshOptionValue::AtCreation => {
2767                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2768                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2769                }
2770                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2771                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2772                    let ecx = &ExprContext {
2773                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2774                        name: "REFRESH AT",
2775                        scope: &Scope::empty(),
2776                        relation_type: &SqlRelationType::empty(),
2777                        allow_aggregates: false,
2778                        allow_subqueries: false,
2779                        allow_parameters: false,
2780                        allow_windows: false,
2781                    };
2782                    let hir = plan_expr(ecx, &time)?.cast_to(
2783                        ecx,
2784                        CastContext::Assignment,
2785                        &SqlScalarType::MzTimestamp,
2786                    )?;
2787                    // (mz_now was purified away to a literal earlier)
2788                    let timestamp = hir
2789                        .into_literal_mz_timestamp()
2790                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2791                    refresh_schedule.ats.push(timestamp);
2792                }
2793                RefreshOptionValue::Every(RefreshEveryOptionValue {
2794                    interval,
2795                    aligned_to,
2796                }) => {
2797                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2798                    if interval.as_microseconds() <= 0 {
2799                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2800                    }
2801                    if interval.months != 0 {
2802                        // This limitation is because we want Intervals to be cleanly convertable
2803                        // to a unix epoch timestamp difference. When the interval involves months, then
2804                        // this is not true anymore, because months have variable lengths.
2805                        // See `Timestamp::round_up`.
2806                        sql_bail!("REFRESH interval must not involve units larger than days");
2807                    }
2808                    let interval = interval.duration()?;
2809                    if u64::try_from(interval.as_millis()).is_err() {
2810                        sql_bail!("REFRESH interval too large");
2811                    }
2812
2813                    let mut aligned_to = match aligned_to {
2814                        Some(aligned_to) => aligned_to,
2815                        None => {
2816                            soft_panic_or_log!(
2817                                "ALIGNED TO should have been filled in by purification"
2818                            );
2819                            sql_bail!(
2820                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2821                            )
2822                        }
2823                    };
2824
2825                    // Desugar the `aligned_to` expression
2826                    transform_ast::transform(scx, &mut aligned_to)?;
2827
2828                    let ecx = &ExprContext {
2829                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2830                        name: "REFRESH EVERY ... ALIGNED TO",
2831                        scope: &Scope::empty(),
2832                        relation_type: &SqlRelationType::empty(),
2833                        allow_aggregates: false,
2834                        allow_subqueries: false,
2835                        allow_parameters: false,
2836                        allow_windows: false,
2837                    };
2838                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2839                        ecx,
2840                        CastContext::Assignment,
2841                        &SqlScalarType::MzTimestamp,
2842                    )?;
2843                    // (mz_now was purified away to a literal earlier)
2844                    let aligned_to_const = aligned_to_hir
2845                        .into_literal_mz_timestamp()
2846                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2847
2848                    refresh_schedule.everies.push(RefreshEvery {
2849                        interval,
2850                        aligned_to: aligned_to_const,
2851                    });
2852                }
2853            }
2854        }
2855
2856        if on_commits_seen > 1 {
2857            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2858        }
2859        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2860            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2861        }
2862
2863        if refresh_schedule == RefreshSchedule::default() {
2864            None
2865        } else {
2866            Some(refresh_schedule)
2867        }
2868    };
2869
2870    let as_of = stmt.as_of.map(Timestamp::from);
2871    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2872    let mut non_null_assertions = assert_not_null
2873        .into_iter()
2874        .map(normalize::column_name)
2875        .map(|assertion_name| {
2876            column_names
2877                .iter()
2878                .position(|col| col == &assertion_name)
2879                .ok_or_else(|| {
2880                    sql_err!(
2881                        "column {} in ASSERT NOT NULL option not found",
2882                        assertion_name.quoted()
2883                    )
2884                })
2885        })
2886        .collect::<Result<Vec<_>, _>>()?;
2887    non_null_assertions.sort();
2888    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2889        let dup = &column_names[*dup];
2890        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2891    }
2892
2893    if let Some(dup) = column_names.iter().duplicates().next() {
2894        sql_bail!("column {} specified more than once", dup.quoted());
2895    }
2896
2897    // Override the statement-level IfExistsBehavior with Skip if this is
2898    // explicitly requested in the PlanContext (the default is `false`).
2899    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2900        Ok(true) => IfExistsBehavior::Skip,
2901        _ => stmt.if_exists,
2902    };
2903
2904    let mut replace = None;
2905    let mut if_not_exists = false;
2906    match if_exists {
2907        IfExistsBehavior::Replace => {
2908            let if_exists = true;
2909            let cascade = false;
2910            let replace_id = plan_drop_item(
2911                scx,
2912                ObjectType::MaterializedView,
2913                if_exists,
2914                partial_name.into(),
2915                cascade,
2916            )?;
2917
2918            // Check if the new Materialized View depends on the item that we would be replacing.
2919            if let Some(id) = replace_id {
2920                let dependencies = expr.depends_on();
2921                let invalid_drop = scx
2922                    .get_item(&id)
2923                    .global_ids()
2924                    .any(|gid| dependencies.contains(&gid));
2925                if invalid_drop {
2926                    let item = scx.catalog.get_item(&id);
2927                    sql_bail!(
2928                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2929                        scx.catalog.resolve_full_name(item.name())
2930                    );
2931                }
2932                replace = Some(id);
2933            }
2934        }
2935        IfExistsBehavior::Skip => if_not_exists = true,
2936        IfExistsBehavior::Error => (),
2937    }
2938    let drop_ids = replace
2939        .map(|id| {
2940            scx.catalog
2941                .item_dependents(id)
2942                .into_iter()
2943                .map(|id| id.unwrap_item_id())
2944                .collect()
2945        })
2946        .unwrap_or_default();
2947    let dependencies = expr
2948        .depends_on()
2949        .into_iter()
2950        .map(|gid| scx.catalog.resolve_item_id(&gid))
2951        .collect();
2952
2953    // Check for an object in the catalog with this same name
2954    let full_name = scx.catalog.resolve_full_name(&name);
2955    let partial_name = PartialItemName::from(full_name.clone());
2956    // For PostgreSQL compatibility, we need to prevent creating materialized
2957    // views when there is an existing object *or* type of the same name.
2958    if let (IfExistsBehavior::Error, Ok(item)) =
2959        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2960    {
2961        return Err(PlanError::ItemAlreadyExists {
2962            name: full_name.to_string(),
2963            item_type: item.item_type(),
2964        });
2965    }
2966
2967    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2968        name,
2969        materialized_view: MaterializedView {
2970            create_sql,
2971            expr,
2972            dependencies,
2973            column_names,
2974            cluster_id,
2975            non_null_assertions,
2976            compaction_window,
2977            refresh_schedule,
2978            as_of,
2979        },
2980        replace,
2981        drop_ids,
2982        if_not_exists,
2983        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2984    }))
2985}
2986
2987generate_extracted_config!(
2988    MaterializedViewOption,
2989    (AssertNotNull, Ident, AllowMultiple),
2990    (PartitionBy, Vec<Ident>),
2991    (RetainHistory, OptionalDuration),
2992    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2993);
2994
2995pub fn plan_create_continual_task(
2996    scx: &StatementContext,
2997    mut stmt: CreateContinualTaskStatement<Aug>,
2998) -> Result<Plan, PlanError> {
2999    match &stmt.sugar {
3000        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3001        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3002            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3003        }
3004        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3005            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3006        }
3007    };
3008    let cluster_id = match &stmt.in_cluster {
3009        None => scx.catalog.resolve_cluster(None)?.id(),
3010        Some(in_cluster) => in_cluster.id,
3011    };
3012    stmt.in_cluster = Some(ResolvedClusterName {
3013        id: cluster_id,
3014        print_name: None,
3015    });
3016
3017    let create_sql =
3018        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3019
3020    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3021
3022    // It seems desirable for a CT that e.g. simply filters the input to keep
3023    // the same nullability. So, start by assuming all columns are non-nullable,
3024    // and then make them nullable below if any of the exprs plan them as
3025    // nullable.
3026    let mut desc = match stmt.columns {
3027        None => None,
3028        Some(columns) => {
3029            let mut desc_columns = Vec::with_capacity(columns.capacity());
3030            for col in columns.iter() {
3031                desc_columns.push((
3032                    normalize::column_name(col.name.clone()),
3033                    SqlColumnType {
3034                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3035                        nullable: false,
3036                    },
3037                ));
3038            }
3039            Some(RelationDesc::from_names_and_types(desc_columns))
3040        }
3041    };
3042    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3043    match input.item_type() {
3044        // Input must be a thing directly backed by a persist shard, so we can
3045        // use a persist listen to efficiently rehydrate.
3046        CatalogItemType::ContinualTask
3047        | CatalogItemType::Table
3048        | CatalogItemType::MaterializedView
3049        | CatalogItemType::Source => {}
3050        CatalogItemType::Sink
3051        | CatalogItemType::View
3052        | CatalogItemType::Index
3053        | CatalogItemType::Type
3054        | CatalogItemType::Func
3055        | CatalogItemType::Secret
3056        | CatalogItemType::Connection => {
3057            sql_bail!(
3058                "CONTINUAL TASK cannot use {} as an input",
3059                input.item_type()
3060            );
3061        }
3062    }
3063
3064    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3065    let ct_name = stmt.name;
3066    let placeholder_id = match &ct_name {
3067        ResolvedItemName::ContinualTask { id, name } => {
3068            let desc = match desc.as_ref().cloned() {
3069                Some(x) => x,
3070                None => {
3071                    // The user didn't specify the CT's columns. Take a wild
3072                    // guess that the CT has the same shape as the input. It's
3073                    // fine if this is wrong, we'll get an error below after
3074                    // planning the query.
3075                    let input_name = scx.catalog.resolve_full_name(input.name());
3076                    input.desc(&input_name)?.into_owned()
3077                }
3078            };
3079            qcx.ctes.insert(
3080                *id,
3081                CteDesc {
3082                    name: name.item.clone(),
3083                    desc,
3084                },
3085            );
3086            Some(*id)
3087        }
3088        _ => None,
3089    };
3090
3091    let mut exprs = Vec::new();
3092    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3093        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3094        let query::PlannedRootQuery {
3095            expr,
3096            desc: desc_query,
3097            finishing,
3098            scope: _,
3099        } = query::plan_ct_query(&mut qcx, query)?;
3100        // We get back a trivial finishing because we plan with a "maintained"
3101        // QueryLifetime, see comment in `plan_view`.
3102        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3103            &finishing,
3104            expr.arity()
3105        ));
3106        if expr.contains_parameters()? {
3107            if expr.contains_parameters()? {
3108                return Err(PlanError::ParameterNotAllowed(
3109                    "continual tasks".to_string(),
3110                ));
3111            }
3112        }
3113        let expr = match desc.as_mut() {
3114            None => {
3115                desc = Some(desc_query);
3116                expr
3117            }
3118            Some(desc) => {
3119                // We specify the columns for DELETE, so if any columns types don't
3120                // match, it's because it's an INSERT.
3121                if desc_query.arity() > desc.arity() {
3122                    sql_bail!(
3123                        "statement {}: INSERT has more expressions than target columns",
3124                        idx
3125                    );
3126                }
3127                if desc_query.arity() < desc.arity() {
3128                    sql_bail!(
3129                        "statement {}: INSERT has more target columns than expressions",
3130                        idx
3131                    );
3132                }
3133                // Ensure the types of the source query match the types of the target table,
3134                // installing assignment casts where necessary and possible.
3135                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3136                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3137                let expr = expr.map_err(|e| {
3138                    sql_err!(
3139                        "statement {}: column {} is of type {} but expression is of type {}",
3140                        idx,
3141                        desc.get_name(e.column).quoted(),
3142                        qcx.humanize_scalar_type(&e.target_type, false),
3143                        qcx.humanize_scalar_type(&e.source_type, false),
3144                    )
3145                })?;
3146
3147                // Update ct nullability as necessary. The `ne` above verified that the
3148                // types are the same len.
3149                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3150                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3151                if updated {
3152                    let new_types = zip_types().map(|(ct, q)| {
3153                        let mut ct = ct.clone();
3154                        if q.nullable {
3155                            ct.nullable = true;
3156                        }
3157                        ct
3158                    });
3159                    *desc = RelationDesc::from_names_and_types(
3160                        desc.iter_names().cloned().zip_eq(new_types),
3161                    );
3162                }
3163
3164                expr
3165            }
3166        };
3167        match stmt {
3168            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3169            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3170        }
3171    }
3172    // TODO(ct3): Collect things by output and assert that there is only one (or
3173    // support multiple outputs).
3174    let expr = exprs
3175        .into_iter()
3176        .reduce(|acc, expr| acc.union(expr))
3177        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3178    let dependencies = expr
3179        .depends_on()
3180        .into_iter()
3181        .map(|gid| scx.catalog.resolve_item_id(&gid))
3182        .collect();
3183
3184    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3185    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3186    if let Some(dup) = column_names.iter().duplicates().next() {
3187        sql_bail!("column {} specified more than once", dup.quoted());
3188    }
3189
3190    // Check for an object in the catalog with this same name
3191    let name = match &ct_name {
3192        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3193        ResolvedItemName::ContinualTask { name, .. } => {
3194            let name = scx.allocate_qualified_name(name.clone())?;
3195            let full_name = scx.catalog.resolve_full_name(&name);
3196            let partial_name = PartialItemName::from(full_name.clone());
3197            // For PostgreSQL compatibility, we need to prevent creating this when there
3198            // is an existing object *or* type of the same name.
3199            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3200                return Err(PlanError::ItemAlreadyExists {
3201                    name: full_name.to_string(),
3202                    item_type: item.item_type(),
3203                });
3204            }
3205            name
3206        }
3207        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3208        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3209    };
3210
3211    let as_of = stmt.as_of.map(Timestamp::from);
3212    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3213        name,
3214        placeholder_id,
3215        desc,
3216        input_id: input.global_id(),
3217        with_snapshot: snapshot.unwrap_or(true),
3218        continual_task: MaterializedView {
3219            create_sql,
3220            expr,
3221            dependencies,
3222            column_names,
3223            cluster_id,
3224            non_null_assertions: Vec::new(),
3225            compaction_window: None,
3226            refresh_schedule: None,
3227            as_of,
3228        },
3229    }))
3230}
3231
3232fn continual_task_query<'a>(
3233    ct_name: &ResolvedItemName,
3234    stmt: &'a ast::ContinualTaskStmt<Aug>,
3235) -> Option<ast::Query<Aug>> {
3236    match stmt {
3237        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3238            table_name: _,
3239            columns,
3240            source,
3241            returning,
3242        }) => {
3243            if !columns.is_empty() || !returning.is_empty() {
3244                return None;
3245            }
3246            match source {
3247                ast::InsertSource::Query(query) => Some(query.clone()),
3248                ast::InsertSource::DefaultValues => None,
3249            }
3250        }
3251        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3252            table_name: _,
3253            alias,
3254            using,
3255            selection,
3256        }) => {
3257            if !using.is_empty() {
3258                return None;
3259            }
3260            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3261            let from = ast::TableWithJoins {
3262                relation: ast::TableFactor::Table {
3263                    name: ct_name.clone(),
3264                    alias: alias.clone(),
3265                },
3266                joins: Vec::new(),
3267            };
3268            let select = ast::Select {
3269                from: vec![from],
3270                selection: selection.clone(),
3271                distinct: None,
3272                projection: vec![ast::SelectItem::Wildcard],
3273                group_by: Vec::new(),
3274                having: None,
3275                qualify: None,
3276                options: Vec::new(),
3277            };
3278            let query = ast::Query {
3279                ctes: ast::CteBlock::Simple(Vec::new()),
3280                body: ast::SetExpr::Select(Box::new(select)),
3281                order_by: Vec::new(),
3282                limit: None,
3283                offset: None,
3284            };
3285            // Then negate it to turn it into retractions (after planning it).
3286            Some(query)
3287        }
3288    }
3289}
3290
3291generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3292
3293pub fn describe_create_sink(
3294    _: &StatementContext,
3295    _: CreateSinkStatement<Aug>,
3296) -> Result<StatementDesc, PlanError> {
3297    Ok(StatementDesc::new(None))
3298}
3299
3300generate_extracted_config!(
3301    CreateSinkOption,
3302    (Snapshot, bool),
3303    (PartitionStrategy, String),
3304    (Version, u64)
3305);
3306
3307pub fn plan_create_sink(
3308    scx: &StatementContext,
3309    stmt: CreateSinkStatement<Aug>,
3310) -> Result<Plan, PlanError> {
3311    // Check for an object in the catalog with this same name
3312    let Some(name) = stmt.name.clone() else {
3313        return Err(PlanError::MissingName(CatalogItemType::Sink));
3314    };
3315    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3316    let full_name = scx.catalog.resolve_full_name(&name);
3317    let partial_name = PartialItemName::from(full_name.clone());
3318    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3319        return Err(PlanError::ItemAlreadyExists {
3320            name: full_name.to_string(),
3321            item_type: item.item_type(),
3322        });
3323    }
3324
3325    plan_sink(scx, stmt)
3326}
3327
3328/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3329/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3330/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3331/// important.
3332fn plan_sink(
3333    scx: &StatementContext,
3334    mut stmt: CreateSinkStatement<Aug>,
3335) -> Result<Plan, PlanError> {
3336    let CreateSinkStatement {
3337        name,
3338        in_cluster: _,
3339        from,
3340        connection,
3341        format,
3342        envelope,
3343        if_not_exists,
3344        with_options,
3345    } = stmt.clone();
3346
3347    let Some(name) = name else {
3348        return Err(PlanError::MissingName(CatalogItemType::Sink));
3349    };
3350    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3351
3352    let envelope = match envelope {
3353        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3354        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3355        None => sql_bail!("ENVELOPE clause is required"),
3356    };
3357
3358    let from_name = &from;
3359    let from = scx.get_item_by_resolved_name(&from)?;
3360    if from.id().is_system() {
3361        bail_unsupported!("creating a sink directly on a catalog object");
3362    }
3363    let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3364    let key_indices = match &connection {
3365        CreateSinkConnection::Kafka { key, .. } => {
3366            if let Some(key) = key.clone() {
3367                let key_columns = key
3368                    .key_columns
3369                    .into_iter()
3370                    .map(normalize::column_name)
3371                    .collect::<Vec<_>>();
3372                let mut uniq = BTreeSet::new();
3373                for col in key_columns.iter() {
3374                    if !uniq.insert(col) {
3375                        sql_bail!("duplicate column referenced in KEY: {}", col);
3376                    }
3377                }
3378                let indices = key_columns
3379                    .iter()
3380                    .map(|col| -> anyhow::Result<usize> {
3381                        let name_idx =
3382                            desc.get_by_name(col)
3383                                .map(|(idx, _type)| idx)
3384                                .ok_or_else(|| {
3385                                    sql_err!("column referenced in KEY does not exist: {}", col)
3386                                })?;
3387                        if desc.get_unambiguous_name(name_idx).is_none() {
3388                            sql_err!("column referenced in KEY is ambiguous: {}", col);
3389                        }
3390                        Ok(name_idx)
3391                    })
3392                    .collect::<Result<Vec<_>, _>>()?;
3393                let is_valid_key =
3394                    desc.typ().keys.iter().any(|key_columns| {
3395                        key_columns.iter().all(|column| indices.contains(column))
3396                    });
3397
3398                if !is_valid_key && envelope == SinkEnvelope::Upsert {
3399                    if key.not_enforced {
3400                        scx.catalog
3401                            .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3402                                key: key_columns.clone(),
3403                                name: name.item.clone(),
3404                            })
3405                    } else {
3406                        return Err(PlanError::UpsertSinkWithInvalidKey {
3407                            name: from_name.full_name_str(),
3408                            desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3409                            valid_keys: desc
3410                                .typ()
3411                                .keys
3412                                .iter()
3413                                .map(|key| {
3414                                    key.iter()
3415                                        .map(|col| desc.get_name(*col).as_str().into())
3416                                        .collect()
3417                                })
3418                                .collect(),
3419                        });
3420                    }
3421                }
3422                Some(indices)
3423            } else {
3424                None
3425            }
3426        }
3427    };
3428
3429    let headers_index = match &connection {
3430        CreateSinkConnection::Kafka {
3431            headers: Some(headers),
3432            ..
3433        } => {
3434            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3435
3436            match envelope {
3437                SinkEnvelope::Upsert => (),
3438                SinkEnvelope::Debezium => {
3439                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3440                }
3441            };
3442
3443            let headers = normalize::column_name(headers.clone());
3444            let (idx, ty) = desc
3445                .get_by_name(&headers)
3446                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3447
3448            if desc.get_unambiguous_name(idx).is_none() {
3449                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3450            }
3451
3452            match &ty.scalar_type {
3453                SqlScalarType::Map { value_type, .. }
3454                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3455                _ => sql_bail!(
3456                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3457                ),
3458            }
3459
3460            Some(idx)
3461        }
3462        _ => None,
3463    };
3464
3465    // pick the first valid natural relation key, if any
3466    let relation_key_indices = desc.typ().keys.get(0).cloned();
3467
3468    let key_desc_and_indices = key_indices.map(|key_indices| {
3469        let cols = desc
3470            .iter()
3471            .map(|(name, ty)| (name.clone(), ty.clone()))
3472            .collect::<Vec<_>>();
3473        let (names, types): (Vec<_>, Vec<_>) =
3474            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3475        let typ = SqlRelationType::new(types);
3476        (RelationDesc::new(typ, names), key_indices)
3477    });
3478
3479    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3480        return Err(PlanError::UpsertSinkWithoutKey);
3481    }
3482
3483    let connection_builder = match connection {
3484        CreateSinkConnection::Kafka {
3485            connection,
3486            options,
3487            ..
3488        } => kafka_sink_builder(
3489            scx,
3490            connection,
3491            options,
3492            format,
3493            relation_key_indices,
3494            key_desc_and_indices,
3495            headers_index,
3496            desc.into_owned(),
3497            envelope,
3498            from.id(),
3499        )?,
3500    };
3501
3502    let CreateSinkOptionExtracted {
3503        snapshot,
3504        version,
3505        partition_strategy: _,
3506        seen: _,
3507    } = with_options.try_into()?;
3508
3509    // WITH SNAPSHOT defaults to true
3510    let with_snapshot = snapshot.unwrap_or(true);
3511    // VERSION defaults to 0
3512    let version = version.unwrap_or(0);
3513
3514    // We will rewrite the cluster if one is not provided, so we must use the
3515    // `in_cluster` value we plan to normalize when we canonicalize the create
3516    // statement.
3517    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3518    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3519
3520    Ok(Plan::CreateSink(CreateSinkPlan {
3521        name,
3522        sink: Sink {
3523            create_sql,
3524            from: from.global_id(),
3525            connection: connection_builder,
3526            envelope,
3527            version,
3528        },
3529        with_snapshot,
3530        if_not_exists,
3531        in_cluster: in_cluster.id(),
3532    }))
3533}
3534
3535fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3536    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3537
3538    let existing_keys = desc
3539        .typ()
3540        .keys
3541        .iter()
3542        .map(|key_columns| {
3543            key_columns
3544                .iter()
3545                .map(|col| desc.get_name(*col).as_str())
3546                .join(", ")
3547        })
3548        .join(", ");
3549
3550    sql_err!(
3551        "Key constraint ({}) conflicts with existing key ({})",
3552        user_keys,
3553        existing_keys
3554    )
3555}
3556
3557/// Creating this by hand instead of using generate_extracted_config! macro
3558/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3559#[derive(Debug, Default, PartialEq, Clone)]
3560pub struct CsrConfigOptionExtracted {
3561    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3562    pub(crate) avro_key_fullname: Option<String>,
3563    pub(crate) avro_value_fullname: Option<String>,
3564    pub(crate) null_defaults: bool,
3565    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3566    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3567    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3568    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3569}
3570
3571impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3572    type Error = crate::plan::PlanError;
3573    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3574        let mut extracted = CsrConfigOptionExtracted::default();
3575        let mut common_doc_comments = BTreeMap::new();
3576        for option in v {
3577            if !extracted.seen.insert(option.name.clone()) {
3578                return Err(PlanError::Unstructured({
3579                    format!("{} specified more than once", option.name)
3580                }));
3581            }
3582            let option_name = option.name.clone();
3583            let option_name_str = option_name.to_ast_string_simple();
3584            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3585                option_name: option_name.to_ast_string_simple(),
3586                err: e.into(),
3587            };
3588            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3589                val.map(|s| match s {
3590                    WithOptionValue::Value(Value::String(s)) => {
3591                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3592                    }
3593                    _ => Err("must be a string".to_string()),
3594                })
3595                .transpose()
3596                .map_err(PlanError::Unstructured)
3597                .map_err(better_error)
3598            };
3599            match option.name {
3600                CsrConfigOptionName::AvroKeyFullname => {
3601                    extracted.avro_key_fullname =
3602                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3603                }
3604                CsrConfigOptionName::AvroValueFullname => {
3605                    extracted.avro_value_fullname =
3606                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3607                }
3608                CsrConfigOptionName::NullDefaults => {
3609                    extracted.null_defaults =
3610                        <bool>::try_from_value(option.value).map_err(better_error)?;
3611                }
3612                CsrConfigOptionName::AvroDocOn(doc_on) => {
3613                    let value = String::try_from_value(option.value.ok_or_else(|| {
3614                        PlanError::InvalidOptionValue {
3615                            option_name: option_name_str,
3616                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3617                        }
3618                    })?)
3619                    .map_err(better_error)?;
3620                    let key = match doc_on.identifier {
3621                        DocOnIdentifier::Column(ast::ColumnName {
3622                            relation: ResolvedItemName::Item { id, .. },
3623                            column: ResolvedColumnReference::Column { name, index: _ },
3624                        }) => DocTarget::Field {
3625                            object_id: id,
3626                            column_name: name,
3627                        },
3628                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3629                            DocTarget::Type(id)
3630                        }
3631                        _ => unreachable!(),
3632                    };
3633
3634                    match doc_on.for_schema {
3635                        DocOnSchema::KeyOnly => {
3636                            extracted.key_doc_options.insert(key, value);
3637                        }
3638                        DocOnSchema::ValueOnly => {
3639                            extracted.value_doc_options.insert(key, value);
3640                        }
3641                        DocOnSchema::All => {
3642                            common_doc_comments.insert(key, value);
3643                        }
3644                    }
3645                }
3646                CsrConfigOptionName::KeyCompatibilityLevel => {
3647                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3648                }
3649                CsrConfigOptionName::ValueCompatibilityLevel => {
3650                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3651                }
3652            }
3653        }
3654
3655        for (key, value) in common_doc_comments {
3656            if !extracted.key_doc_options.contains_key(&key) {
3657                extracted.key_doc_options.insert(key.clone(), value.clone());
3658            }
3659            if !extracted.value_doc_options.contains_key(&key) {
3660                extracted.value_doc_options.insert(key, value);
3661            }
3662        }
3663        Ok(extracted)
3664    }
3665}
3666
3667fn kafka_sink_builder(
3668    scx: &StatementContext,
3669    connection: ResolvedItemName,
3670    options: Vec<KafkaSinkConfigOption<Aug>>,
3671    format: Option<FormatSpecifier<Aug>>,
3672    relation_key_indices: Option<Vec<usize>>,
3673    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3674    headers_index: Option<usize>,
3675    value_desc: RelationDesc,
3676    envelope: SinkEnvelope,
3677    sink_from: CatalogItemId,
3678) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3679    // Get Kafka connection.
3680    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3681    let connection_id = connection_item.id();
3682    match connection_item.connection()? {
3683        Connection::Kafka(_) => (),
3684        _ => sql_bail!(
3685            "{} is not a kafka connection",
3686            scx.catalog.resolve_full_name(connection_item.name())
3687        ),
3688    };
3689
3690    let KafkaSinkConfigOptionExtracted {
3691        topic,
3692        compression_type,
3693        partition_by,
3694        progress_group_id_prefix,
3695        transactional_id_prefix,
3696        legacy_ids,
3697        topic_config,
3698        topic_metadata_refresh_interval,
3699        topic_partition_count,
3700        topic_replication_factor,
3701        seen: _,
3702    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3703
3704    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3705        (Some(_), Some(true)) => {
3706            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3707        }
3708        (None, Some(true)) => KafkaIdStyle::Legacy,
3709        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3710    };
3711
3712    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3713        (Some(_), Some(true)) => {
3714            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3715        }
3716        (None, Some(true)) => KafkaIdStyle::Legacy,
3717        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3718    };
3719
3720    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3721
3722    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3723        // This is a librdkafka-enforced restriction that, if violated,
3724        // would result in a runtime error for the source.
3725        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3726    }
3727
3728    let assert_positive = |val: Option<i32>, name: &str| {
3729        if let Some(val) = val {
3730            if val <= 0 {
3731                sql_bail!("{} must be a positive integer", name);
3732            }
3733        }
3734        val.map(NonNeg::try_from)
3735            .transpose()
3736            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3737    };
3738    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3739    let topic_replication_factor =
3740        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3741
3742    // Helper method to parse avro connection options for format specifiers that use avro
3743    // for either key or value encoding.
3744    let gen_avro_schema_options = |conn| {
3745        let CsrConnectionAvro {
3746            connection:
3747                CsrConnection {
3748                    connection,
3749                    options,
3750                },
3751            seed,
3752            key_strategy,
3753            value_strategy,
3754        } = conn;
3755        if seed.is_some() {
3756            sql_bail!("SEED option does not make sense with sinks");
3757        }
3758        if key_strategy.is_some() {
3759            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3760        }
3761        if value_strategy.is_some() {
3762            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3763        }
3764
3765        let item = scx.get_item_by_resolved_name(&connection)?;
3766        let csr_connection = match item.connection()? {
3767            Connection::Csr(_) => item.id(),
3768            _ => {
3769                sql_bail!(
3770                    "{} is not a schema registry connection",
3771                    scx.catalog
3772                        .resolve_full_name(item.name())
3773                        .to_string()
3774                        .quoted()
3775                )
3776            }
3777        };
3778        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3779
3780        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3781            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3782        }
3783
3784        if key_desc_and_indices.is_some()
3785            && (extracted_options.avro_key_fullname.is_some()
3786                ^ extracted_options.avro_value_fullname.is_some())
3787        {
3788            sql_bail!(
3789                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3790            );
3791        }
3792
3793        Ok((csr_connection, extracted_options))
3794    };
3795
3796    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3797        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3798        Format::Bytes if desc.arity() == 1 => {
3799            let col_type = &desc.typ().column_types[0].scalar_type;
3800            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3801                bail_unsupported!(format!(
3802                    "BYTES format with non-encodable type: {:?}",
3803                    col_type
3804                ));
3805            }
3806
3807            Ok(KafkaSinkFormatType::Bytes)
3808        }
3809        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3810        Format::Bytes | Format::Text => {
3811            bail_unsupported!("BYTES or TEXT format with multiple columns")
3812        }
3813        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3814        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3815            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3816            let schema = if is_key {
3817                AvroSchemaGenerator::new(
3818                    desc.clone(),
3819                    false,
3820                    options.key_doc_options,
3821                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3822                    options.null_defaults,
3823                    Some(sink_from),
3824                    false,
3825                )?
3826                .schema()
3827                .to_string()
3828            } else {
3829                AvroSchemaGenerator::new(
3830                    desc.clone(),
3831                    matches!(envelope, SinkEnvelope::Debezium),
3832                    options.value_doc_options,
3833                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3834                    options.null_defaults,
3835                    Some(sink_from),
3836                    true,
3837                )?
3838                .schema()
3839                .to_string()
3840            };
3841            Ok(KafkaSinkFormatType::Avro {
3842                schema,
3843                compatibility_level: if is_key {
3844                    options.key_compatibility_level
3845                } else {
3846                    options.value_compatibility_level
3847                },
3848                csr_connection,
3849            })
3850        }
3851        format => bail_unsupported!(format!("sink format {:?}", format)),
3852    };
3853
3854    let partition_by = match &partition_by {
3855        Some(partition_by) => {
3856            let mut scope = Scope::from_source(None, value_desc.iter_names());
3857
3858            match envelope {
3859                SinkEnvelope::Upsert => (),
3860                SinkEnvelope::Debezium => {
3861                    let key_indices: HashSet<_> = key_desc_and_indices
3862                        .as_ref()
3863                        .map(|(_desc, indices)| indices.as_slice())
3864                        .unwrap_or_default()
3865                        .into_iter()
3866                        .collect();
3867                    for (i, item) in scope.items.iter_mut().enumerate() {
3868                        if !key_indices.contains(&i) {
3869                            item.error_if_referenced = Some(|_table, column| {
3870                                PlanError::InvalidPartitionByEnvelopeDebezium {
3871                                    column_name: column.to_string(),
3872                                }
3873                            });
3874                        }
3875                    }
3876                }
3877            };
3878
3879            let ecx = &ExprContext {
3880                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3881                name: "PARTITION BY",
3882                scope: &scope,
3883                relation_type: value_desc.typ(),
3884                allow_aggregates: false,
3885                allow_subqueries: false,
3886                allow_parameters: false,
3887                allow_windows: false,
3888            };
3889            let expr = plan_expr(ecx, partition_by)?.cast_to(
3890                ecx,
3891                CastContext::Assignment,
3892                &SqlScalarType::UInt64,
3893            )?;
3894            let expr = expr.lower_uncorrelated()?;
3895
3896            Some(expr)
3897        }
3898        _ => None,
3899    };
3900
3901    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3902    let format = match format {
3903        Some(FormatSpecifier::KeyValue { key, value }) => {
3904            let key_format = match key_desc_and_indices.as_ref() {
3905                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3906                None => None,
3907            };
3908            KafkaSinkFormat {
3909                value_format: map_format(value, &value_desc, false)?,
3910                key_format,
3911            }
3912        }
3913        Some(FormatSpecifier::Bare(format)) => {
3914            let key_format = match key_desc_and_indices.as_ref() {
3915                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3916                None => None,
3917            };
3918            KafkaSinkFormat {
3919                value_format: map_format(format, &value_desc, false)?,
3920                key_format,
3921            }
3922        }
3923        None => bail_unsupported!("sink without format"),
3924    };
3925
3926    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3927        connection_id,
3928        connection: connection_id,
3929        format,
3930        topic: topic_name,
3931        relation_key_indices,
3932        key_desc_and_indices,
3933        headers_index,
3934        value_desc,
3935        partition_by,
3936        compression_type,
3937        progress_group_id,
3938        transactional_id,
3939        topic_options: KafkaTopicOptions {
3940            partition_count: topic_partition_count,
3941            replication_factor: topic_replication_factor,
3942            topic_config: topic_config.unwrap_or_default(),
3943        },
3944        topic_metadata_refresh_interval,
3945    }))
3946}
3947
3948pub fn describe_create_index(
3949    _: &StatementContext,
3950    _: CreateIndexStatement<Aug>,
3951) -> Result<StatementDesc, PlanError> {
3952    Ok(StatementDesc::new(None))
3953}
3954
3955pub fn plan_create_index(
3956    scx: &StatementContext,
3957    mut stmt: CreateIndexStatement<Aug>,
3958) -> Result<Plan, PlanError> {
3959    let CreateIndexStatement {
3960        name,
3961        on_name,
3962        in_cluster,
3963        key_parts,
3964        with_options,
3965        if_not_exists,
3966    } = &mut stmt;
3967    let on = scx.get_item_by_resolved_name(on_name)?;
3968    let on_item_type = on.item_type();
3969
3970    if !matches!(
3971        on_item_type,
3972        CatalogItemType::View
3973            | CatalogItemType::MaterializedView
3974            | CatalogItemType::Source
3975            | CatalogItemType::Table
3976    ) {
3977        sql_bail!(
3978            "index cannot be created on {} because it is a {}",
3979            on_name.full_name_str(),
3980            on.item_type()
3981        )
3982    }
3983
3984    let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3985
3986    let filled_key_parts = match key_parts {
3987        Some(kp) => kp.to_vec(),
3988        None => {
3989            // `key_parts` is None if we're creating a "default" index.
3990            on.desc(&scx.catalog.resolve_full_name(on.name()))?
3991                .typ()
3992                .default_key()
3993                .iter()
3994                .map(|i| match on_desc.get_unambiguous_name(*i) {
3995                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
3996                    _ => Expr::Value(Value::Number((i + 1).to_string())),
3997                })
3998                .collect()
3999        }
4000    };
4001    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4002
4003    let index_name = if let Some(name) = name {
4004        QualifiedItemName {
4005            qualifiers: on.name().qualifiers.clone(),
4006            item: normalize::ident(name.clone()),
4007        }
4008    } else {
4009        let mut idx_name = QualifiedItemName {
4010            qualifiers: on.name().qualifiers.clone(),
4011            item: on.name().item.clone(),
4012        };
4013        if key_parts.is_none() {
4014            // We're trying to create the "default" index.
4015            idx_name.item += "_primary_idx";
4016        } else {
4017            // Use PG schema for automatically naming indexes:
4018            // `<table>_<_-separated indexed expressions>_idx`
4019            let index_name_col_suffix = keys
4020                .iter()
4021                .map(|k| match k {
4022                    mz_expr::MirScalarExpr::Column(i, name) => {
4023                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4024                            (Some(col_name), _) => col_name.to_string(),
4025                            (None, Some(name)) => name.to_string(),
4026                            (None, None) => format!("{}", i + 1),
4027                        }
4028                    }
4029                    _ => "expr".to_string(),
4030                })
4031                .join("_");
4032            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4033                .expect("write on strings cannot fail");
4034            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4035        }
4036
4037        if !*if_not_exists {
4038            scx.catalog.find_available_name(idx_name)
4039        } else {
4040            idx_name
4041        }
4042    };
4043
4044    // Check for an object in the catalog with this same name
4045    let full_name = scx.catalog.resolve_full_name(&index_name);
4046    let partial_name = PartialItemName::from(full_name.clone());
4047    // For PostgreSQL compatibility, we need to prevent creating indexes when
4048    // there is an existing object *or* type of the same name.
4049    //
4050    // Technically, we only need to prevent coexistence of indexes and types
4051    // that have an associated relation (record types but not list/map types).
4052    // Enforcing that would be more complicated, though. It's backwards
4053    // compatible to weaken this restriction in the future.
4054    if let (Ok(item), false, false) = (
4055        scx.catalog.resolve_item_or_type(&partial_name),
4056        *if_not_exists,
4057        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4058    ) {
4059        return Err(PlanError::ItemAlreadyExists {
4060            name: full_name.to_string(),
4061            item_type: item.item_type(),
4062        });
4063    }
4064
4065    let options = plan_index_options(scx, with_options.clone())?;
4066    let cluster_id = match in_cluster {
4067        None => scx.resolve_cluster(None)?.id(),
4068        Some(in_cluster) => in_cluster.id,
4069    };
4070
4071    *in_cluster = Some(ResolvedClusterName {
4072        id: cluster_id,
4073        print_name: None,
4074    });
4075
4076    // Normalize `stmt`.
4077    *name = Some(Ident::new(index_name.item.clone())?);
4078    *key_parts = Some(filled_key_parts);
4079    let if_not_exists = *if_not_exists;
4080
4081    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4082    let compaction_window = options.iter().find_map(|o| {
4083        #[allow(irrefutable_let_patterns)]
4084        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4085            Some(lcw.clone())
4086        } else {
4087            None
4088        }
4089    });
4090
4091    Ok(Plan::CreateIndex(CreateIndexPlan {
4092        name: index_name,
4093        index: Index {
4094            create_sql,
4095            on: on.global_id(),
4096            keys,
4097            cluster_id,
4098            compaction_window,
4099        },
4100        if_not_exists,
4101    }))
4102}
4103
4104pub fn describe_create_type(
4105    _: &StatementContext,
4106    _: CreateTypeStatement<Aug>,
4107) -> Result<StatementDesc, PlanError> {
4108    Ok(StatementDesc::new(None))
4109}
4110
4111pub fn plan_create_type(
4112    scx: &StatementContext,
4113    stmt: CreateTypeStatement<Aug>,
4114) -> Result<Plan, PlanError> {
4115    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4116    let CreateTypeStatement { name, as_type, .. } = stmt;
4117
4118    fn validate_data_type(
4119        scx: &StatementContext,
4120        data_type: ResolvedDataType,
4121        as_type: &str,
4122        key: &str,
4123    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4124        let (id, modifiers) = match data_type {
4125            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4126            _ => sql_bail!(
4127                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4128                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4129                as_type,
4130                key,
4131                data_type.human_readable_name(),
4132            ),
4133        };
4134
4135        let item = scx.catalog.get_item(&id);
4136        match item.type_details() {
4137            None => sql_bail!(
4138                "{} must be of class type, but received {} which is of class {}",
4139                key,
4140                scx.catalog.resolve_full_name(item.name()),
4141                item.item_type()
4142            ),
4143            Some(CatalogTypeDetails {
4144                typ: CatalogType::Char,
4145                ..
4146            }) => {
4147                bail_unsupported!("embedding char type in a list or map")
4148            }
4149            _ => {
4150                // Validate that the modifiers are actually valid.
4151                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4152
4153                Ok((id, modifiers))
4154            }
4155        }
4156    }
4157
4158    let inner = match as_type {
4159        CreateTypeAs::List { options } => {
4160            let CreateTypeListOptionExtracted {
4161                element_type,
4162                seen: _,
4163            } = CreateTypeListOptionExtracted::try_from(options)?;
4164            let element_type =
4165                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4166            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4167            CatalogType::List {
4168                element_reference: id,
4169                element_modifiers: modifiers,
4170            }
4171        }
4172        CreateTypeAs::Map { options } => {
4173            let CreateTypeMapOptionExtracted {
4174                key_type,
4175                value_type,
4176                seen: _,
4177            } = CreateTypeMapOptionExtracted::try_from(options)?;
4178            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4179            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4180            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4181            let (value_id, value_modifiers) =
4182                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4183            CatalogType::Map {
4184                key_reference: key_id,
4185                key_modifiers,
4186                value_reference: value_id,
4187                value_modifiers,
4188            }
4189        }
4190        CreateTypeAs::Record { column_defs } => {
4191            let mut fields = vec![];
4192            for column_def in column_defs {
4193                let data_type = column_def.data_type;
4194                let key = ident(column_def.name.clone());
4195                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4196                fields.push(CatalogRecordField {
4197                    name: ColumnName::from(key.clone()),
4198                    type_reference: id,
4199                    type_modifiers: modifiers,
4200                });
4201            }
4202            CatalogType::Record { fields }
4203        }
4204    };
4205
4206    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4207
4208    // Check for an object in the catalog with this same name
4209    let full_name = scx.catalog.resolve_full_name(&name);
4210    let partial_name = PartialItemName::from(full_name.clone());
4211    // For PostgreSQL compatibility, we need to prevent creating types when
4212    // there is an existing object *or* type of the same name.
4213    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4214        if item.item_type().conflicts_with_type() {
4215            return Err(PlanError::ItemAlreadyExists {
4216                name: full_name.to_string(),
4217                item_type: item.item_type(),
4218            });
4219        }
4220    }
4221
4222    Ok(Plan::CreateType(CreateTypePlan {
4223        name,
4224        typ: Type { create_sql, inner },
4225    }))
4226}
4227
4228generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4229
4230generate_extracted_config!(
4231    CreateTypeMapOption,
4232    (KeyType, ResolvedDataType),
4233    (ValueType, ResolvedDataType)
4234);
4235
4236#[derive(Debug)]
4237pub enum PlannedAlterRoleOption {
4238    Attributes(PlannedRoleAttributes),
4239    Variable(PlannedRoleVariable),
4240}
4241
4242#[derive(Debug, Clone)]
4243pub struct PlannedRoleAttributes {
4244    pub inherit: Option<bool>,
4245    pub password: Option<Password>,
4246    /// `nopassword` is set to true if the password is from the parser is None.
4247    /// This is semantically different than not supplying a password at all,
4248    /// to allow for unsetting a password.
4249    pub nopassword: Option<bool>,
4250    pub superuser: Option<bool>,
4251    pub login: Option<bool>,
4252}
4253
4254fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4255    let mut planned_attributes = PlannedRoleAttributes {
4256        inherit: None,
4257        password: None,
4258        superuser: None,
4259        login: None,
4260        nopassword: None,
4261    };
4262
4263    for option in options {
4264        match option {
4265            RoleAttribute::Inherit | RoleAttribute::NoInherit
4266                if planned_attributes.inherit.is_some() =>
4267            {
4268                sql_bail!("conflicting or redundant options");
4269            }
4270            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4271                bail_never_supported!(
4272                    "CREATECLUSTER attribute",
4273                    "sql/create-role/#details",
4274                    "Use system privileges instead."
4275                );
4276            }
4277            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4278                bail_never_supported!(
4279                    "CREATEDB attribute",
4280                    "sql/create-role/#details",
4281                    "Use system privileges instead."
4282                );
4283            }
4284            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4285                bail_never_supported!(
4286                    "CREATEROLE attribute",
4287                    "sql/create-role/#details",
4288                    "Use system privileges instead."
4289                );
4290            }
4291            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4292                sql_bail!("conflicting or redundant options");
4293            }
4294
4295            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4296            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4297            RoleAttribute::Password(password) => {
4298                if let Some(password) = password {
4299                    planned_attributes.password = Some(password.into());
4300                } else {
4301                    planned_attributes.nopassword = Some(true);
4302                }
4303            }
4304            RoleAttribute::SuperUser => {
4305                if planned_attributes.superuser == Some(false) {
4306                    sql_bail!("conflicting or redundant options");
4307                }
4308                planned_attributes.superuser = Some(true);
4309            }
4310            RoleAttribute::NoSuperUser => {
4311                if planned_attributes.superuser == Some(true) {
4312                    sql_bail!("conflicting or redundant options");
4313                }
4314                planned_attributes.superuser = Some(false);
4315            }
4316            RoleAttribute::Login => {
4317                if planned_attributes.login == Some(false) {
4318                    sql_bail!("conflicting or redundant options");
4319                }
4320                planned_attributes.login = Some(true);
4321            }
4322            RoleAttribute::NoLogin => {
4323                if planned_attributes.login == Some(true) {
4324                    sql_bail!("conflicting or redundant options");
4325                }
4326                planned_attributes.login = Some(false);
4327            }
4328        }
4329    }
4330    if planned_attributes.inherit == Some(false) {
4331        bail_unsupported!("non inherit roles");
4332    }
4333
4334    Ok(planned_attributes)
4335}
4336
4337#[derive(Debug)]
4338pub enum PlannedRoleVariable {
4339    Set { name: String, value: VariableValue },
4340    Reset { name: String },
4341}
4342
4343impl PlannedRoleVariable {
4344    pub fn name(&self) -> &str {
4345        match self {
4346            PlannedRoleVariable::Set { name, .. } => name,
4347            PlannedRoleVariable::Reset { name } => name,
4348        }
4349    }
4350}
4351
4352fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4353    let plan = match variable {
4354        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4355            name: name.to_string(),
4356            value: scl::plan_set_variable_to(value)?,
4357        },
4358        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4359            name: name.to_string(),
4360        },
4361    };
4362    Ok(plan)
4363}
4364
4365pub fn describe_create_role(
4366    _: &StatementContext,
4367    _: CreateRoleStatement,
4368) -> Result<StatementDesc, PlanError> {
4369    Ok(StatementDesc::new(None))
4370}
4371
4372pub fn plan_create_role(
4373    _: &StatementContext,
4374    CreateRoleStatement { name, options }: CreateRoleStatement,
4375) -> Result<Plan, PlanError> {
4376    let attributes = plan_role_attributes(options)?;
4377    Ok(Plan::CreateRole(CreateRolePlan {
4378        name: normalize::ident(name),
4379        attributes: attributes.into(),
4380    }))
4381}
4382
4383pub fn plan_create_network_policy(
4384    ctx: &StatementContext,
4385    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4386) -> Result<Plan, PlanError> {
4387    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4388    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4389
4390    let Some(rule_defs) = policy_options.rules else {
4391        sql_bail!("RULES must be specified when creating network policies.");
4392    };
4393
4394    let mut rules = vec![];
4395    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4396        let NetworkPolicyRuleOptionExtracted {
4397            seen: _,
4398            direction,
4399            action,
4400            address,
4401        } = options.try_into()?;
4402        let (direction, action, address) = match (direction, action, address) {
4403            (Some(direction), Some(action), Some(address)) => (
4404                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4405                NetworkPolicyRuleAction::try_from(action.as_str())?,
4406                PolicyAddress::try_from(address.as_str())?,
4407            ),
4408            (_, _, _) => {
4409                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4410            }
4411        };
4412        rules.push(NetworkPolicyRule {
4413            name: normalize::ident(name),
4414            direction,
4415            action,
4416            address,
4417        });
4418    }
4419
4420    if rules.len()
4421        > ctx
4422            .catalog
4423            .system_vars()
4424            .max_rules_per_network_policy()
4425            .try_into()?
4426    {
4427        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4428    }
4429
4430    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4431        name: normalize::ident(name),
4432        rules,
4433    }))
4434}
4435
4436pub fn plan_alter_network_policy(
4437    ctx: &StatementContext,
4438    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4439) -> Result<Plan, PlanError> {
4440    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4441
4442    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4443    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4444
4445    let Some(rule_defs) = policy_options.rules else {
4446        sql_bail!("RULES must be specified when creating network policies.");
4447    };
4448
4449    let mut rules = vec![];
4450    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4451        let NetworkPolicyRuleOptionExtracted {
4452            seen: _,
4453            direction,
4454            action,
4455            address,
4456        } = options.try_into()?;
4457
4458        let (direction, action, address) = match (direction, action, address) {
4459            (Some(direction), Some(action), Some(address)) => (
4460                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4461                NetworkPolicyRuleAction::try_from(action.as_str())?,
4462                PolicyAddress::try_from(address.as_str())?,
4463            ),
4464            (_, _, _) => {
4465                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4466            }
4467        };
4468        rules.push(NetworkPolicyRule {
4469            name: normalize::ident(name),
4470            direction,
4471            action,
4472            address,
4473        });
4474    }
4475    if rules.len()
4476        > ctx
4477            .catalog
4478            .system_vars()
4479            .max_rules_per_network_policy()
4480            .try_into()?
4481    {
4482        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4483    }
4484
4485    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4486        id: policy.id(),
4487        name: normalize::ident(name),
4488        rules,
4489    }))
4490}
4491
4492pub fn describe_create_cluster(
4493    _: &StatementContext,
4494    _: CreateClusterStatement<Aug>,
4495) -> Result<StatementDesc, PlanError> {
4496    Ok(StatementDesc::new(None))
4497}
4498
4499// WARNING:
4500// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4501// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4502// that option stays the same. If you were to give a default value here, then not giving that option
4503// to ALTER CLUSTER would always reset the value of that option to the default.
4504generate_extracted_config!(
4505    ClusterOption,
4506    (AvailabilityZones, Vec<String>),
4507    (Disk, bool),
4508    (IntrospectionDebugging, bool),
4509    (IntrospectionInterval, OptionalDuration),
4510    (Managed, bool),
4511    (Replicas, Vec<ReplicaDefinition<Aug>>),
4512    (ReplicationFactor, u32),
4513    (Size, String),
4514    (Schedule, ClusterScheduleOptionValue),
4515    (WorkloadClass, OptionalString)
4516);
4517
4518generate_extracted_config!(
4519    NetworkPolicyOption,
4520    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4521);
4522
4523generate_extracted_config!(
4524    NetworkPolicyRuleOption,
4525    (Direction, String),
4526    (Action, String),
4527    (Address, String)
4528);
4529
4530generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4531
4532generate_extracted_config!(
4533    ClusterAlterUntilReadyOption,
4534    (Timeout, Duration),
4535    (OnTimeout, String)
4536);
4537
4538generate_extracted_config!(
4539    ClusterFeature,
4540    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4541    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4542    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4543    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4544    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4545    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4546    (
4547        EnableProjectionPushdownAfterRelationCse,
4548        Option<bool>,
4549        Default(None)
4550    )
4551);
4552
4553/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4554///
4555/// The reverse of [`unplan_create_cluster`].
4556pub fn plan_create_cluster(
4557    scx: &StatementContext,
4558    stmt: CreateClusterStatement<Aug>,
4559) -> Result<Plan, PlanError> {
4560    let plan = plan_create_cluster_inner(scx, stmt)?;
4561
4562    // Roundtrip through unplan and make sure that we end up with the same plan.
4563    if let CreateClusterVariant::Managed(_) = &plan.variant {
4564        let stmt = unplan_create_cluster(scx, plan.clone())
4565            .map_err(|e| PlanError::Replan(e.to_string()))?;
4566        let create_sql = stmt.to_ast_string_stable();
4567        let stmt = parse::parse(&create_sql)
4568            .map_err(|e| PlanError::Replan(e.to_string()))?
4569            .into_element()
4570            .ast;
4571        let (stmt, _resolved_ids) =
4572            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4573        let stmt = match stmt {
4574            Statement::CreateCluster(stmt) => stmt,
4575            stmt => {
4576                return Err(PlanError::Replan(format!(
4577                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4578                )));
4579            }
4580        };
4581        let replan =
4582            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4583        if plan != replan {
4584            return Err(PlanError::Replan(format!(
4585                "replan does not match: plan={plan:?}, replan={replan:?}"
4586            )));
4587        }
4588    }
4589
4590    Ok(Plan::CreateCluster(plan))
4591}
4592
4593pub fn plan_create_cluster_inner(
4594    scx: &StatementContext,
4595    CreateClusterStatement {
4596        name,
4597        options,
4598        features,
4599    }: CreateClusterStatement<Aug>,
4600) -> Result<CreateClusterPlan, PlanError> {
4601    let ClusterOptionExtracted {
4602        availability_zones,
4603        introspection_debugging,
4604        introspection_interval,
4605        managed,
4606        replicas,
4607        replication_factor,
4608        seen: _,
4609        size,
4610        disk,
4611        schedule,
4612        workload_class,
4613    }: ClusterOptionExtracted = options.try_into()?;
4614
4615    let managed = managed.unwrap_or_else(|| replicas.is_none());
4616
4617    if !scx.catalog.active_role_id().is_system() {
4618        if !features.is_empty() {
4619            sql_bail!("FEATURES not supported for non-system users");
4620        }
4621        if workload_class.is_some() {
4622            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4623        }
4624    }
4625
4626    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4627    let workload_class = workload_class.and_then(|v| v.0);
4628
4629    if managed {
4630        if replicas.is_some() {
4631            sql_bail!("REPLICAS not supported for managed clusters");
4632        }
4633        let Some(size) = size else {
4634            sql_bail!("SIZE must be specified for managed clusters");
4635        };
4636
4637        if disk.is_some() {
4638            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4639            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4640            // we'll be able to remove the `DISK` option entirely.
4641            if scx.catalog.is_cluster_size_cc(&size) {
4642                sql_bail!(
4643                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4644                );
4645            }
4646
4647            scx.catalog
4648                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4649        }
4650
4651        let compute = plan_compute_replica_config(
4652            introspection_interval,
4653            introspection_debugging.unwrap_or(false),
4654        )?;
4655
4656        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4657            replication_factor.unwrap_or_else(|| {
4658                scx.catalog
4659                    .system_vars()
4660                    .default_cluster_replication_factor()
4661            })
4662        } else {
4663            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4664            if replication_factor.is_some() {
4665                sql_bail!(
4666                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4667                );
4668            }
4669            // If we have a non-trivial schedule, then let's not have any replicas initially,
4670            // to avoid quickly going back and forth if the schedule doesn't want a replica
4671            // initially.
4672            0
4673        };
4674        let availability_zones = availability_zones.unwrap_or_default();
4675
4676        if !availability_zones.is_empty() {
4677            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4678        }
4679
4680        // Plan OptimizerFeatureOverrides.
4681        let ClusterFeatureExtracted {
4682            reoptimize_imported_views,
4683            enable_eager_delta_joins,
4684            enable_new_outer_join_lowering,
4685            enable_variadic_left_join_lowering,
4686            enable_letrec_fixpoint_analysis,
4687            enable_join_prioritize_arranged,
4688            enable_projection_pushdown_after_relation_cse,
4689            seen: _,
4690        } = ClusterFeatureExtracted::try_from(features)?;
4691        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4692            reoptimize_imported_views,
4693            enable_eager_delta_joins,
4694            enable_new_outer_join_lowering,
4695            enable_variadic_left_join_lowering,
4696            enable_letrec_fixpoint_analysis,
4697            enable_join_prioritize_arranged,
4698            enable_projection_pushdown_after_relation_cse,
4699            ..Default::default()
4700        };
4701
4702        let schedule = plan_cluster_schedule(schedule)?;
4703
4704        Ok(CreateClusterPlan {
4705            name: normalize::ident(name),
4706            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4707                replication_factor,
4708                size,
4709                availability_zones,
4710                compute,
4711                optimizer_feature_overrides,
4712                schedule,
4713            }),
4714            workload_class,
4715        })
4716    } else {
4717        let Some(replica_defs) = replicas else {
4718            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4719        };
4720        if availability_zones.is_some() {
4721            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4722        }
4723        if replication_factor.is_some() {
4724            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4725        }
4726        if introspection_debugging.is_some() {
4727            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4728        }
4729        if introspection_interval.is_some() {
4730            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4731        }
4732        if size.is_some() {
4733            sql_bail!("SIZE not supported for unmanaged clusters");
4734        }
4735        if disk.is_some() {
4736            sql_bail!("DISK not supported for unmanaged clusters");
4737        }
4738        if !features.is_empty() {
4739            sql_bail!("FEATURES not supported for unmanaged clusters");
4740        }
4741        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4742            sql_bail!(
4743                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4744            );
4745        }
4746
4747        let mut replicas = vec![];
4748        for ReplicaDefinition { name, options } in replica_defs {
4749            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4750        }
4751
4752        Ok(CreateClusterPlan {
4753            name: normalize::ident(name),
4754            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4755            workload_class,
4756        })
4757    }
4758}
4759
4760/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4761///
4762/// The reverse of [`plan_create_cluster`].
4763pub fn unplan_create_cluster(
4764    scx: &StatementContext,
4765    CreateClusterPlan {
4766        name,
4767        variant,
4768        workload_class,
4769    }: CreateClusterPlan,
4770) -> Result<CreateClusterStatement<Aug>, PlanError> {
4771    match variant {
4772        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4773            replication_factor,
4774            size,
4775            availability_zones,
4776            compute,
4777            optimizer_feature_overrides,
4778            schedule,
4779        }) => {
4780            let schedule = unplan_cluster_schedule(schedule);
4781            let OptimizerFeatureOverrides {
4782                enable_guard_subquery_tablefunc: _,
4783                enable_consolidate_after_union_negate: _,
4784                enable_reduce_mfp_fusion: _,
4785                enable_cardinality_estimates: _,
4786                persist_fast_path_limit: _,
4787                reoptimize_imported_views,
4788                enable_eager_delta_joins,
4789                enable_new_outer_join_lowering,
4790                enable_variadic_left_join_lowering,
4791                enable_letrec_fixpoint_analysis,
4792                enable_reduce_reduction: _,
4793                enable_join_prioritize_arranged,
4794                enable_projection_pushdown_after_relation_cse,
4795                enable_less_reduce_in_eqprop: _,
4796                enable_dequadratic_eqprop_map: _,
4797                enable_eq_classes_withholding_errors: _,
4798                enable_fast_path_plan_insights: _,
4799            } = optimizer_feature_overrides;
4800            // The ones from above that don't occur below are not wired up to cluster features.
4801            let features_extracted = ClusterFeatureExtracted {
4802                // Seen is ignored when unplanning.
4803                seen: Default::default(),
4804                reoptimize_imported_views,
4805                enable_eager_delta_joins,
4806                enable_new_outer_join_lowering,
4807                enable_variadic_left_join_lowering,
4808                enable_letrec_fixpoint_analysis,
4809                enable_join_prioritize_arranged,
4810                enable_projection_pushdown_after_relation_cse,
4811            };
4812            let features = features_extracted.into_values(scx.catalog);
4813            let availability_zones = if availability_zones.is_empty() {
4814                None
4815            } else {
4816                Some(availability_zones)
4817            };
4818            let (introspection_interval, introspection_debugging) =
4819                unplan_compute_replica_config(compute);
4820            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4821            // always 1 or less.
4822            let replication_factor = match &schedule {
4823                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4824                ClusterScheduleOptionValue::Refresh { .. } => {
4825                    assert!(
4826                        replication_factor <= 1,
4827                        "replication factor, {replication_factor:?}, must be <= 1"
4828                    );
4829                    None
4830                }
4831            };
4832            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4833            let options_extracted = ClusterOptionExtracted {
4834                // Seen is ignored when unplanning.
4835                seen: Default::default(),
4836                availability_zones,
4837                disk: None,
4838                introspection_debugging: Some(introspection_debugging),
4839                introspection_interval,
4840                managed: Some(true),
4841                replicas: None,
4842                replication_factor,
4843                size: Some(size),
4844                schedule: Some(schedule),
4845                workload_class,
4846            };
4847            let options = options_extracted.into_values(scx.catalog);
4848            let name = Ident::new_unchecked(name);
4849            Ok(CreateClusterStatement {
4850                name,
4851                options,
4852                features,
4853            })
4854        }
4855        CreateClusterVariant::Unmanaged(_) => {
4856            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4857        }
4858    }
4859}
4860
4861generate_extracted_config!(
4862    ReplicaOption,
4863    (AvailabilityZone, String),
4864    (BilledAs, String),
4865    (ComputeAddresses, Vec<String>),
4866    (ComputectlAddresses, Vec<String>),
4867    (Disk, bool),
4868    (Internal, bool, Default(false)),
4869    (IntrospectionDebugging, bool, Default(false)),
4870    (IntrospectionInterval, OptionalDuration),
4871    (Size, String),
4872    (StorageAddresses, Vec<String>),
4873    (StoragectlAddresses, Vec<String>),
4874    (Workers, u16)
4875);
4876
4877fn plan_replica_config(
4878    scx: &StatementContext,
4879    options: Vec<ReplicaOption<Aug>>,
4880) -> Result<ReplicaConfig, PlanError> {
4881    let ReplicaOptionExtracted {
4882        availability_zone,
4883        billed_as,
4884        computectl_addresses,
4885        disk,
4886        internal,
4887        introspection_debugging,
4888        introspection_interval,
4889        size,
4890        storagectl_addresses,
4891        ..
4892    }: ReplicaOptionExtracted = options.try_into()?;
4893
4894    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4895
4896    match (
4897        size,
4898        availability_zone,
4899        billed_as,
4900        storagectl_addresses,
4901        computectl_addresses,
4902    ) {
4903        // Common cases we expect end users to hit.
4904        (None, _, None, None, None) => {
4905            // We don't mention the unmanaged options in the error message
4906            // because they are only available in unsafe mode.
4907            sql_bail!("SIZE option must be specified");
4908        }
4909        (Some(size), availability_zone, billed_as, None, None) => {
4910            if disk.is_some() {
4911                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4912                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4913                // we'll be able to remove the `DISK` option entirely.
4914                if scx.catalog.is_cluster_size_cc(&size) {
4915                    sql_bail!(
4916                        "DISK option not supported for modern cluster sizes because disk is always enabled"
4917                    );
4918                }
4919
4920                scx.catalog
4921                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4922            }
4923
4924            Ok(ReplicaConfig::Orchestrated {
4925                size,
4926                availability_zone,
4927                compute,
4928                billed_as,
4929                internal,
4930            })
4931        }
4932
4933        (None, None, None, storagectl_addresses, computectl_addresses) => {
4934            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4935
4936            // When manually testing Materialize in unsafe mode, it's easy to
4937            // accidentally omit one of these options, so we try to produce
4938            // helpful error messages.
4939            let Some(storagectl_addrs) = storagectl_addresses else {
4940                sql_bail!("missing STORAGECTL ADDRESSES option");
4941            };
4942            let Some(computectl_addrs) = computectl_addresses else {
4943                sql_bail!("missing COMPUTECTL ADDRESSES option");
4944            };
4945
4946            if storagectl_addrs.len() != computectl_addrs.len() {
4947                sql_bail!(
4948                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4949                );
4950            }
4951
4952            if disk.is_some() {
4953                sql_bail!("DISK can't be specified for unorchestrated clusters");
4954            }
4955
4956            Ok(ReplicaConfig::Unorchestrated {
4957                storagectl_addrs,
4958                computectl_addrs,
4959                compute,
4960            })
4961        }
4962        _ => {
4963            // We don't bother trying to produce a more helpful error message
4964            // here because no user is likely to hit this path.
4965            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4966        }
4967    }
4968}
4969
4970/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
4971///
4972/// The reverse of [`unplan_compute_replica_config`].
4973fn plan_compute_replica_config(
4974    introspection_interval: Option<OptionalDuration>,
4975    introspection_debugging: bool,
4976) -> Result<ComputeReplicaConfig, PlanError> {
4977    let introspection_interval = introspection_interval
4978        .map(|OptionalDuration(i)| i)
4979        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
4980    let introspection = match introspection_interval {
4981        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
4982            interval,
4983            debugging: introspection_debugging,
4984        }),
4985        None if introspection_debugging => {
4986            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
4987        }
4988        None => None,
4989    };
4990    let compute = ComputeReplicaConfig { introspection };
4991    Ok(compute)
4992}
4993
4994/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
4995///
4996/// The reverse of [`plan_compute_replica_config`].
4997fn unplan_compute_replica_config(
4998    compute_replica_config: ComputeReplicaConfig,
4999) -> (Option<OptionalDuration>, bool) {
5000    match compute_replica_config.introspection {
5001        Some(ComputeReplicaIntrospectionConfig {
5002            debugging,
5003            interval,
5004        }) => (Some(OptionalDuration(Some(interval))), debugging),
5005        None => (Some(OptionalDuration(None)), false),
5006    }
5007}
5008
5009/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5010///
5011/// The reverse of [`unplan_cluster_schedule`].
5012fn plan_cluster_schedule(
5013    schedule: ClusterScheduleOptionValue,
5014) -> Result<ClusterSchedule, PlanError> {
5015    Ok(match schedule {
5016        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5017        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5018        ClusterScheduleOptionValue::Refresh {
5019            hydration_time_estimate: None,
5020        } => ClusterSchedule::Refresh {
5021            hydration_time_estimate: Duration::from_millis(0),
5022        },
5023        // Otherwise we convert the `IntervalValue` to a `Duration`.
5024        ClusterScheduleOptionValue::Refresh {
5025            hydration_time_estimate: Some(interval_value),
5026        } => {
5027            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5028            if interval.as_microseconds() < 0 {
5029                sql_bail!(
5030                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5031                    interval
5032                );
5033            }
5034            if interval.months != 0 {
5035                // This limitation is because we want this interval to be cleanly convertable
5036                // to a unix epoch timestamp difference. When the interval involves months, then
5037                // this is not true anymore, because months have variable lengths.
5038                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5039            }
5040            let duration = interval.duration()?;
5041            if u64::try_from(duration.as_millis()).is_err()
5042                || Interval::from_duration(&duration).is_err()
5043            {
5044                sql_bail!("HYDRATION TIME ESTIMATE too large");
5045            }
5046            ClusterSchedule::Refresh {
5047                hydration_time_estimate: duration,
5048            }
5049        }
5050    })
5051}
5052
5053/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5054///
5055/// The reverse of [`plan_cluster_schedule`].
5056fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5057    match schedule {
5058        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5059        ClusterSchedule::Refresh {
5060            hydration_time_estimate,
5061        } => {
5062            let interval = Interval::from_duration(&hydration_time_estimate)
5063                .expect("planning ensured that this is convertible back to Interval");
5064            let interval_value = literal::unplan_interval(&interval);
5065            ClusterScheduleOptionValue::Refresh {
5066                hydration_time_estimate: Some(interval_value),
5067            }
5068        }
5069    }
5070}
5071
5072pub fn describe_create_cluster_replica(
5073    _: &StatementContext,
5074    _: CreateClusterReplicaStatement<Aug>,
5075) -> Result<StatementDesc, PlanError> {
5076    Ok(StatementDesc::new(None))
5077}
5078
5079pub fn plan_create_cluster_replica(
5080    scx: &StatementContext,
5081    CreateClusterReplicaStatement {
5082        definition: ReplicaDefinition { name, options },
5083        of_cluster,
5084    }: CreateClusterReplicaStatement<Aug>,
5085) -> Result<Plan, PlanError> {
5086    let cluster = scx
5087        .catalog
5088        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5089    let current_replica_count = cluster.replica_ids().iter().count();
5090    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5091        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5092        return Err(PlanError::CreateReplicaFailStorageObjects {
5093            current_replica_count,
5094            internal_replica_count,
5095            hypothetical_replica_count: current_replica_count + 1,
5096        });
5097    }
5098
5099    let config = plan_replica_config(scx, options)?;
5100
5101    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5102        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5103            return Err(PlanError::MangedReplicaName(name.into_string()));
5104        }
5105    } else {
5106        ensure_cluster_is_not_managed(scx, cluster.id())?;
5107    }
5108
5109    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5110        name: normalize::ident(name),
5111        cluster_id: cluster.id(),
5112        config,
5113    }))
5114}
5115
5116pub fn describe_create_secret(
5117    _: &StatementContext,
5118    _: CreateSecretStatement<Aug>,
5119) -> Result<StatementDesc, PlanError> {
5120    Ok(StatementDesc::new(None))
5121}
5122
5123pub fn plan_create_secret(
5124    scx: &StatementContext,
5125    stmt: CreateSecretStatement<Aug>,
5126) -> Result<Plan, PlanError> {
5127    let CreateSecretStatement {
5128        name,
5129        if_not_exists,
5130        value,
5131    } = &stmt;
5132
5133    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5134    let mut create_sql_statement = stmt.clone();
5135    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5136    let create_sql =
5137        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5138    let secret_as = query::plan_secret_as(scx, value.clone())?;
5139
5140    let secret = Secret {
5141        create_sql,
5142        secret_as,
5143    };
5144
5145    Ok(Plan::CreateSecret(CreateSecretPlan {
5146        name,
5147        secret,
5148        if_not_exists: *if_not_exists,
5149    }))
5150}
5151
5152pub fn describe_create_connection(
5153    _: &StatementContext,
5154    _: CreateConnectionStatement<Aug>,
5155) -> Result<StatementDesc, PlanError> {
5156    Ok(StatementDesc::new(None))
5157}
5158
5159generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5160
5161pub fn plan_create_connection(
5162    scx: &StatementContext,
5163    mut stmt: CreateConnectionStatement<Aug>,
5164) -> Result<Plan, PlanError> {
5165    let CreateConnectionStatement {
5166        name,
5167        connection_type,
5168        values,
5169        if_not_exists,
5170        with_options,
5171    } = stmt.clone();
5172    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5173    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5174    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5175
5176    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5177    if options.validate.is_some() {
5178        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5179    }
5180    let validate = match options.validate {
5181        Some(val) => val,
5182        None => {
5183            scx.catalog
5184                .system_vars()
5185                .enable_default_connection_validation()
5186                && details.to_connection().validate_by_default()
5187        }
5188    };
5189
5190    // Check for an object in the catalog with this same name
5191    let full_name = scx.catalog.resolve_full_name(&name);
5192    let partial_name = PartialItemName::from(full_name.clone());
5193    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5194        return Err(PlanError::ItemAlreadyExists {
5195            name: full_name.to_string(),
5196            item_type: item.item_type(),
5197        });
5198    }
5199
5200    // For SSH connections, overwrite the public key options based on the
5201    // connection details, in case we generated new keys during planning.
5202    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5203        stmt.values.retain(|v| {
5204            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5205        });
5206        stmt.values.push(ConnectionOption {
5207            name: ConnectionOptionName::PublicKey1,
5208            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5209        });
5210        stmt.values.push(ConnectionOption {
5211            name: ConnectionOptionName::PublicKey2,
5212            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5213        });
5214    }
5215    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5216
5217    let plan = CreateConnectionPlan {
5218        name,
5219        if_not_exists,
5220        connection: crate::plan::Connection {
5221            create_sql,
5222            details,
5223        },
5224        validate,
5225    };
5226    Ok(Plan::CreateConnection(plan))
5227}
5228
5229fn plan_drop_database(
5230    scx: &StatementContext,
5231    if_exists: bool,
5232    name: &UnresolvedDatabaseName,
5233    cascade: bool,
5234) -> Result<Option<DatabaseId>, PlanError> {
5235    Ok(match resolve_database(scx, name, if_exists)? {
5236        Some(database) => {
5237            if !cascade && database.has_schemas() {
5238                sql_bail!(
5239                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5240                    name,
5241                );
5242            }
5243            Some(database.id())
5244        }
5245        None => None,
5246    })
5247}
5248
5249pub fn describe_drop_objects(
5250    _: &StatementContext,
5251    _: DropObjectsStatement,
5252) -> Result<StatementDesc, PlanError> {
5253    Ok(StatementDesc::new(None))
5254}
5255
5256pub fn plan_drop_objects(
5257    scx: &mut StatementContext,
5258    DropObjectsStatement {
5259        object_type,
5260        if_exists,
5261        names,
5262        cascade,
5263    }: DropObjectsStatement,
5264) -> Result<Plan, PlanError> {
5265    assert_ne!(
5266        object_type,
5267        mz_sql_parser::ast::ObjectType::Func,
5268        "rejected in parser"
5269    );
5270    let object_type = object_type.into();
5271
5272    let mut referenced_ids = Vec::new();
5273    for name in names {
5274        let id = match &name {
5275            UnresolvedObjectName::Cluster(name) => {
5276                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5277            }
5278            UnresolvedObjectName::ClusterReplica(name) => {
5279                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5280            }
5281            UnresolvedObjectName::Database(name) => {
5282                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5283            }
5284            UnresolvedObjectName::Schema(name) => {
5285                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5286            }
5287            UnresolvedObjectName::Role(name) => {
5288                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5289            }
5290            UnresolvedObjectName::Item(name) => {
5291                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5292                    .map(ObjectId::Item)
5293            }
5294            UnresolvedObjectName::NetworkPolicy(name) => {
5295                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5296            }
5297        };
5298        match id {
5299            Some(id) => referenced_ids.push(id),
5300            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5301                name: name.to_ast_string_simple(),
5302                object_type,
5303            }),
5304        }
5305    }
5306    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5307
5308    Ok(Plan::DropObjects(DropObjectsPlan {
5309        referenced_ids,
5310        drop_ids,
5311        object_type,
5312    }))
5313}
5314
5315fn plan_drop_schema(
5316    scx: &StatementContext,
5317    if_exists: bool,
5318    name: &UnresolvedSchemaName,
5319    cascade: bool,
5320) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5321    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5322        Some((database_spec, schema_spec)) => {
5323            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5324                sql_bail!(
5325                    "cannot drop schema {name} because it is required by the database system",
5326                );
5327            }
5328            if let SchemaSpecifier::Temporary = schema_spec {
5329                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5330            }
5331            let schema = scx.get_schema(&database_spec, &schema_spec);
5332            if !cascade && schema.has_items() {
5333                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5334                sql_bail!(
5335                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5336                    full_schema_name
5337                );
5338            }
5339            Some((database_spec, schema_spec))
5340        }
5341        None => None,
5342    })
5343}
5344
5345fn plan_drop_role(
5346    scx: &StatementContext,
5347    if_exists: bool,
5348    name: &Ident,
5349) -> Result<Option<RoleId>, PlanError> {
5350    match scx.catalog.resolve_role(name.as_str()) {
5351        Ok(role) => {
5352            let id = role.id();
5353            if &id == scx.catalog.active_role_id() {
5354                sql_bail!("current role cannot be dropped");
5355            }
5356            for role in scx.catalog.get_roles() {
5357                for (member_id, grantor_id) in role.membership() {
5358                    if &id == grantor_id {
5359                        let member_role = scx.catalog.get_role(member_id);
5360                        sql_bail!(
5361                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5362                            name.as_str(),
5363                            role.name(),
5364                            member_role.name()
5365                        );
5366                    }
5367                }
5368            }
5369            Ok(Some(role.id()))
5370        }
5371        Err(_) if if_exists => Ok(None),
5372        Err(e) => Err(e.into()),
5373    }
5374}
5375
5376fn plan_drop_cluster(
5377    scx: &StatementContext,
5378    if_exists: bool,
5379    name: &Ident,
5380    cascade: bool,
5381) -> Result<Option<ClusterId>, PlanError> {
5382    Ok(match resolve_cluster(scx, name, if_exists)? {
5383        Some(cluster) => {
5384            if !cascade && !cluster.bound_objects().is_empty() {
5385                return Err(PlanError::DependentObjectsStillExist {
5386                    object_type: "cluster".to_string(),
5387                    object_name: cluster.name().to_string(),
5388                    dependents: Vec::new(),
5389                });
5390            }
5391            Some(cluster.id())
5392        }
5393        None => None,
5394    })
5395}
5396
5397fn plan_drop_network_policy(
5398    scx: &StatementContext,
5399    if_exists: bool,
5400    name: &Ident,
5401) -> Result<Option<NetworkPolicyId>, PlanError> {
5402    match scx.catalog.resolve_network_policy(name.as_str()) {
5403        Ok(policy) => {
5404            // TODO(network_policy): When we support role based network policies, check if any role
5405            // currently has the specified policy set.
5406            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5407                Err(PlanError::NetworkPolicyInUse)
5408            } else {
5409                Ok(Some(policy.id()))
5410            }
5411        }
5412        Err(_) if if_exists => Ok(None),
5413        Err(e) => Err(e.into()),
5414    }
5415}
5416
5417/// Returns `true` if the cluster has any object that requires a single replica.
5418/// Returns `false` if the cluster has no objects.
5419fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5420    // If this feature is enabled then all objects support multiple-replicas
5421    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5422        false
5423    } else {
5424        // Othewise we check for the existence of sources or sinks
5425        cluster.bound_objects().iter().any(|id| {
5426            let item = scx.catalog.get_item(id);
5427            matches!(
5428                item.item_type(),
5429                CatalogItemType::Sink | CatalogItemType::Source
5430            )
5431        })
5432    }
5433}
5434
5435fn plan_drop_cluster_replica(
5436    scx: &StatementContext,
5437    if_exists: bool,
5438    name: &QualifiedReplica,
5439) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5440    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5441    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5442}
5443
5444/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5445fn plan_drop_item(
5446    scx: &StatementContext,
5447    object_type: ObjectType,
5448    if_exists: bool,
5449    name: UnresolvedItemName,
5450    cascade: bool,
5451) -> Result<Option<CatalogItemId>, PlanError> {
5452    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5453        Ok(r) => r,
5454        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5455        Err(PlanError::MismatchedObjectType {
5456            name,
5457            is_type: ObjectType::MaterializedView,
5458            expected_type: ObjectType::View,
5459        }) => {
5460            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5461        }
5462        e => e?,
5463    };
5464
5465    Ok(match resolved {
5466        Some(catalog_item) => {
5467            if catalog_item.id().is_system() {
5468                sql_bail!(
5469                    "cannot drop {} {} because it is required by the database system",
5470                    catalog_item.item_type(),
5471                    scx.catalog.minimal_qualification(catalog_item.name()),
5472                );
5473            }
5474
5475            if !cascade {
5476                for id in catalog_item.used_by() {
5477                    let dep = scx.catalog.get_item(id);
5478                    if dependency_prevents_drop(object_type, dep) {
5479                        return Err(PlanError::DependentObjectsStillExist {
5480                            object_type: catalog_item.item_type().to_string(),
5481                            object_name: scx
5482                                .catalog
5483                                .minimal_qualification(catalog_item.name())
5484                                .to_string(),
5485                            dependents: vec![(
5486                                dep.item_type().to_string(),
5487                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5488                            )],
5489                        });
5490                    }
5491                }
5492                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5493                //  relies on entry. Unfortunately, we don't have that information readily available.
5494            }
5495            Some(catalog_item.id())
5496        }
5497        None => None,
5498    })
5499}
5500
5501/// Does the dependency `dep` prevent a drop of a non-cascade query?
5502fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5503    match object_type {
5504        ObjectType::Type => true,
5505        _ => match dep.item_type() {
5506            CatalogItemType::Func
5507            | CatalogItemType::Table
5508            | CatalogItemType::Source
5509            | CatalogItemType::View
5510            | CatalogItemType::MaterializedView
5511            | CatalogItemType::Sink
5512            | CatalogItemType::Type
5513            | CatalogItemType::Secret
5514            | CatalogItemType::Connection
5515            | CatalogItemType::ContinualTask => true,
5516            CatalogItemType::Index => false,
5517        },
5518    }
5519}
5520
5521pub fn describe_alter_index_options(
5522    _: &StatementContext,
5523    _: AlterIndexStatement<Aug>,
5524) -> Result<StatementDesc, PlanError> {
5525    Ok(StatementDesc::new(None))
5526}
5527
5528pub fn describe_drop_owned(
5529    _: &StatementContext,
5530    _: DropOwnedStatement<Aug>,
5531) -> Result<StatementDesc, PlanError> {
5532    Ok(StatementDesc::new(None))
5533}
5534
5535pub fn plan_drop_owned(
5536    scx: &StatementContext,
5537    drop: DropOwnedStatement<Aug>,
5538) -> Result<Plan, PlanError> {
5539    let cascade = drop.cascade();
5540    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5541    let mut drop_ids = Vec::new();
5542    let mut privilege_revokes = Vec::new();
5543    let mut default_privilege_revokes = Vec::new();
5544
5545    fn update_privilege_revokes(
5546        object_id: SystemObjectId,
5547        privileges: &PrivilegeMap,
5548        role_ids: &BTreeSet<RoleId>,
5549        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5550    ) {
5551        privilege_revokes.extend(iter::zip(
5552            iter::repeat(object_id),
5553            privileges
5554                .all_values()
5555                .filter(|privilege| role_ids.contains(&privilege.grantee))
5556                .cloned(),
5557        ));
5558    }
5559
5560    // Replicas
5561    for replica in scx.catalog.get_cluster_replicas() {
5562        if role_ids.contains(&replica.owner_id()) {
5563            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5564        }
5565    }
5566
5567    // Clusters
5568    for cluster in scx.catalog.get_clusters() {
5569        if role_ids.contains(&cluster.owner_id()) {
5570            // Note: CASCADE is not required for replicas.
5571            if !cascade {
5572                let non_owned_bound_objects: Vec<_> = cluster
5573                    .bound_objects()
5574                    .into_iter()
5575                    .map(|item_id| scx.catalog.get_item(item_id))
5576                    .filter(|item| !role_ids.contains(&item.owner_id()))
5577                    .collect();
5578                if !non_owned_bound_objects.is_empty() {
5579                    let names: Vec<_> = non_owned_bound_objects
5580                        .into_iter()
5581                        .map(|item| {
5582                            (
5583                                item.item_type().to_string(),
5584                                scx.catalog.resolve_full_name(item.name()).to_string(),
5585                            )
5586                        })
5587                        .collect();
5588                    return Err(PlanError::DependentObjectsStillExist {
5589                        object_type: "cluster".to_string(),
5590                        object_name: cluster.name().to_string(),
5591                        dependents: names,
5592                    });
5593                }
5594            }
5595            drop_ids.push(cluster.id().into());
5596        }
5597        update_privilege_revokes(
5598            SystemObjectId::Object(cluster.id().into()),
5599            cluster.privileges(),
5600            &role_ids,
5601            &mut privilege_revokes,
5602        );
5603    }
5604
5605    // Items
5606    for item in scx.catalog.get_items() {
5607        if role_ids.contains(&item.owner_id()) {
5608            if !cascade {
5609                // Checks if any items still depend on this one, returning an error if so.
5610                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5611                    let non_owned_dependencies: Vec<_> = used_by
5612                        .into_iter()
5613                        .map(|item_id| scx.catalog.get_item(item_id))
5614                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5615                        .filter(|item| !role_ids.contains(&item.owner_id()))
5616                        .collect();
5617                    if !non_owned_dependencies.is_empty() {
5618                        let names: Vec<_> = non_owned_dependencies
5619                            .into_iter()
5620                            .map(|item| {
5621                                let item_typ = item.item_type().to_string();
5622                                let item_name =
5623                                    scx.catalog.resolve_full_name(item.name()).to_string();
5624                                (item_typ, item_name)
5625                            })
5626                            .collect();
5627                        Err(PlanError::DependentObjectsStillExist {
5628                            object_type: item.item_type().to_string(),
5629                            object_name: scx
5630                                .catalog
5631                                .resolve_full_name(item.name())
5632                                .to_string()
5633                                .to_string(),
5634                            dependents: names,
5635                        })
5636                    } else {
5637                        Ok(())
5638                    }
5639                };
5640
5641                // When this item gets dropped it will also drop its progress source, so we need to
5642                // check the users of those.
5643                if let Some(id) = item.progress_id() {
5644                    let progress_item = scx.catalog.get_item(&id);
5645                    check_if_dependents_exist(progress_item.used_by())?;
5646                }
5647                check_if_dependents_exist(item.used_by())?;
5648            }
5649            drop_ids.push(item.id().into());
5650        }
5651        update_privilege_revokes(
5652            SystemObjectId::Object(item.id().into()),
5653            item.privileges(),
5654            &role_ids,
5655            &mut privilege_revokes,
5656        );
5657    }
5658
5659    // Schemas
5660    for schema in scx.catalog.get_schemas() {
5661        if !schema.id().is_temporary() {
5662            if role_ids.contains(&schema.owner_id()) {
5663                if !cascade {
5664                    let non_owned_dependencies: Vec<_> = schema
5665                        .item_ids()
5666                        .map(|item_id| scx.catalog.get_item(&item_id))
5667                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5668                        .filter(|item| !role_ids.contains(&item.owner_id()))
5669                        .collect();
5670                    if !non_owned_dependencies.is_empty() {
5671                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5672                        sql_bail!(
5673                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5674                            full_schema_name.to_string().quoted()
5675                        );
5676                    }
5677                }
5678                drop_ids.push((*schema.database(), *schema.id()).into())
5679            }
5680            update_privilege_revokes(
5681                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5682                schema.privileges(),
5683                &role_ids,
5684                &mut privilege_revokes,
5685            );
5686        }
5687    }
5688
5689    // Databases
5690    for database in scx.catalog.get_databases() {
5691        if role_ids.contains(&database.owner_id()) {
5692            if !cascade {
5693                let non_owned_schemas: Vec<_> = database
5694                    .schemas()
5695                    .into_iter()
5696                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5697                    .collect();
5698                if !non_owned_schemas.is_empty() {
5699                    sql_bail!(
5700                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5701                        database.name().quoted(),
5702                    );
5703                }
5704            }
5705            drop_ids.push(database.id().into());
5706        }
5707        update_privilege_revokes(
5708            SystemObjectId::Object(database.id().into()),
5709            database.privileges(),
5710            &role_ids,
5711            &mut privilege_revokes,
5712        );
5713    }
5714
5715    // System
5716    update_privilege_revokes(
5717        SystemObjectId::System,
5718        scx.catalog.get_system_privileges(),
5719        &role_ids,
5720        &mut privilege_revokes,
5721    );
5722
5723    for (default_privilege_object, default_privilege_acl_items) in
5724        scx.catalog.get_default_privileges()
5725    {
5726        for default_privilege_acl_item in default_privilege_acl_items {
5727            if role_ids.contains(&default_privilege_object.role_id)
5728                || role_ids.contains(&default_privilege_acl_item.grantee)
5729            {
5730                default_privilege_revokes.push((
5731                    default_privilege_object.clone(),
5732                    default_privilege_acl_item.clone(),
5733                ));
5734            }
5735        }
5736    }
5737
5738    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5739
5740    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5741    if !system_ids.is_empty() {
5742        let mut owners = system_ids
5743            .into_iter()
5744            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5745            .collect::<BTreeSet<_>>()
5746            .into_iter()
5747            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5748        sql_bail!(
5749            "cannot drop objects owned by role {} because they are required by the database system",
5750            owners.join(", "),
5751        );
5752    }
5753
5754    Ok(Plan::DropOwned(DropOwnedPlan {
5755        role_ids: role_ids.into_iter().collect(),
5756        drop_ids,
5757        privilege_revokes,
5758        default_privilege_revokes,
5759    }))
5760}
5761
5762fn plan_retain_history_option(
5763    scx: &StatementContext,
5764    retain_history: Option<OptionalDuration>,
5765) -> Result<Option<CompactionWindow>, PlanError> {
5766    if let Some(OptionalDuration(lcw)) = retain_history {
5767        Ok(Some(plan_retain_history(scx, lcw)?))
5768    } else {
5769        Ok(None)
5770    }
5771}
5772
5773// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5774// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5775// already converts the zero duration into `None`. This function must not be called in the `RESET
5776// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5777// `None`.
5778fn plan_retain_history(
5779    scx: &StatementContext,
5780    lcw: Option<Duration>,
5781) -> Result<CompactionWindow, PlanError> {
5782    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5783    match lcw {
5784        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5785        // disable compaction), and should never occur here. Furthermore, some things actually do
5786        // break when this is set to real zero:
5787        // https://github.com/MaterializeInc/database-issues/issues/3798.
5788        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5789            option_name: "RETAIN HISTORY".to_string(),
5790            err: Box::new(PlanError::Unstructured(
5791                "internal error: unexpectedly zero".to_string(),
5792            )),
5793        }),
5794        Some(duration) => {
5795            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5796            // should only be possible during testing).
5797            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5798                && scx
5799                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5800                    .is_err()
5801            {
5802                return Err(PlanError::RetainHistoryLow {
5803                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5804                });
5805            }
5806            Ok(duration.try_into()?)
5807        }
5808        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5809        // to be a bad choice, so prevent it.
5810        None => {
5811            if scx
5812                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5813                .is_err()
5814            {
5815                Err(PlanError::RetainHistoryRequired)
5816            } else {
5817                Ok(CompactionWindow::DisableCompaction)
5818            }
5819        }
5820    }
5821}
5822
5823generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5824
5825fn plan_index_options(
5826    scx: &StatementContext,
5827    with_opts: Vec<IndexOption<Aug>>,
5828) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5829    if !with_opts.is_empty() {
5830        // Index options are not durable.
5831        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5832    }
5833
5834    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5835
5836    let mut out = Vec::with_capacity(1);
5837    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5838        out.push(crate::plan::IndexOption::RetainHistory(cw));
5839    }
5840    Ok(out)
5841}
5842
5843generate_extracted_config!(
5844    TableOption,
5845    (PartitionBy, Vec<Ident>),
5846    (RetainHistory, OptionalDuration),
5847    (RedactedTest, String)
5848);
5849
5850fn plan_table_options(
5851    scx: &StatementContext,
5852    desc: &RelationDesc,
5853    with_opts: Vec<TableOption<Aug>>,
5854) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5855    let TableOptionExtracted {
5856        partition_by,
5857        retain_history,
5858        redacted_test,
5859        ..
5860    }: TableOptionExtracted = with_opts.try_into()?;
5861
5862    if let Some(partition_by) = partition_by {
5863        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5864        check_partition_by(desc, partition_by)?;
5865    }
5866
5867    if redacted_test.is_some() {
5868        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5869    }
5870
5871    let mut out = Vec::with_capacity(1);
5872    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5873        out.push(crate::plan::TableOption::RetainHistory(cw));
5874    }
5875    Ok(out)
5876}
5877
5878pub fn plan_alter_index_options(
5879    scx: &mut StatementContext,
5880    AlterIndexStatement {
5881        index_name,
5882        if_exists,
5883        action,
5884    }: AlterIndexStatement<Aug>,
5885) -> Result<Plan, PlanError> {
5886    let object_type = ObjectType::Index;
5887    match action {
5888        AlterIndexAction::ResetOptions(options) => {
5889            let mut options = options.into_iter();
5890            if let Some(opt) = options.next() {
5891                match opt {
5892                    IndexOptionName::RetainHistory => {
5893                        if options.next().is_some() {
5894                            sql_bail!("RETAIN HISTORY must be only option");
5895                        }
5896                        return alter_retain_history(
5897                            scx,
5898                            object_type,
5899                            if_exists,
5900                            UnresolvedObjectName::Item(index_name),
5901                            None,
5902                        );
5903                    }
5904                }
5905            }
5906            sql_bail!("expected option");
5907        }
5908        AlterIndexAction::SetOptions(options) => {
5909            let mut options = options.into_iter();
5910            if let Some(opt) = options.next() {
5911                match opt.name {
5912                    IndexOptionName::RetainHistory => {
5913                        if options.next().is_some() {
5914                            sql_bail!("RETAIN HISTORY must be only option");
5915                        }
5916                        return alter_retain_history(
5917                            scx,
5918                            object_type,
5919                            if_exists,
5920                            UnresolvedObjectName::Item(index_name),
5921                            opt.value,
5922                        );
5923                    }
5924                }
5925            }
5926            sql_bail!("expected option");
5927        }
5928    }
5929}
5930
5931pub fn describe_alter_cluster_set_options(
5932    _: &StatementContext,
5933    _: AlterClusterStatement<Aug>,
5934) -> Result<StatementDesc, PlanError> {
5935    Ok(StatementDesc::new(None))
5936}
5937
5938pub fn plan_alter_cluster(
5939    scx: &mut StatementContext,
5940    AlterClusterStatement {
5941        name,
5942        action,
5943        if_exists,
5944    }: AlterClusterStatement<Aug>,
5945) -> Result<Plan, PlanError> {
5946    let cluster = match resolve_cluster(scx, &name, if_exists)? {
5947        Some(entry) => entry,
5948        None => {
5949            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5950                name: name.to_ast_string_simple(),
5951                object_type: ObjectType::Cluster,
5952            });
5953
5954            return Ok(Plan::AlterNoop(AlterNoopPlan {
5955                object_type: ObjectType::Cluster,
5956            }));
5957        }
5958    };
5959
5960    let mut options: PlanClusterOption = Default::default();
5961    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5962
5963    match action {
5964        AlterClusterAction::SetOptions {
5965            options: set_options,
5966            with_options,
5967        } => {
5968            let ClusterOptionExtracted {
5969                availability_zones,
5970                introspection_debugging,
5971                introspection_interval,
5972                managed,
5973                replicas: replica_defs,
5974                replication_factor,
5975                seen: _,
5976                size,
5977                disk,
5978                schedule,
5979                workload_class,
5980            }: ClusterOptionExtracted = set_options.try_into()?;
5981
5982            if !scx.catalog.active_role_id().is_system() {
5983                if workload_class.is_some() {
5984                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
5985                }
5986            }
5987
5988            match managed.unwrap_or_else(|| cluster.is_managed()) {
5989                true => {
5990                    let alter_strategy_extracted =
5991                        ClusterAlterOptionExtracted::try_from(with_options)?;
5992                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
5993
5994                    match alter_strategy {
5995                        AlterClusterPlanStrategy::None => {}
5996                        _ => {
5997                            scx.require_feature_flag(
5998                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
5999                            )?;
6000                        }
6001                    }
6002
6003                    if replica_defs.is_some() {
6004                        sql_bail!("REPLICAS not supported for managed clusters");
6005                    }
6006                    if schedule.is_some()
6007                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6008                    {
6009                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6010                    }
6011
6012                    if let Some(replication_factor) = replication_factor {
6013                        if schedule.is_some()
6014                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6015                        {
6016                            sql_bail!(
6017                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6018                            );
6019                        }
6020                        if let Some(current_schedule) = cluster.schedule() {
6021                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6022                                sql_bail!(
6023                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6024                                );
6025                            }
6026                        }
6027
6028                        let internal_replica_count =
6029                            cluster.replicas().iter().filter(|r| r.internal()).count();
6030                        let hypothetical_replica_count =
6031                            internal_replica_count + usize::cast_from(replication_factor);
6032
6033                        // Total number of replicas running is internal replicas
6034                        // + replication factor.
6035                        if contains_single_replica_objects(scx, cluster)
6036                            && hypothetical_replica_count > 1
6037                        {
6038                            return Err(PlanError::CreateReplicaFailStorageObjects {
6039                                current_replica_count: cluster.replica_ids().iter().count(),
6040                                internal_replica_count,
6041                                hypothetical_replica_count,
6042                            });
6043                        }
6044                    } else if alter_strategy.is_some() {
6045                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6046                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6047                        // just fail.
6048                        let internal_replica_count =
6049                            cluster.replicas().iter().filter(|r| r.internal()).count();
6050                        let hypothetical_replica_count = internal_replica_count * 2;
6051                        if contains_single_replica_objects(scx, cluster) {
6052                            return Err(PlanError::CreateReplicaFailStorageObjects {
6053                                current_replica_count: cluster.replica_ids().iter().count(),
6054                                internal_replica_count,
6055                                hypothetical_replica_count,
6056                            });
6057                        }
6058                    }
6059                }
6060                false => {
6061                    if !alter_strategy.is_none() {
6062                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6063                    }
6064                    if availability_zones.is_some() {
6065                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6066                    }
6067                    if replication_factor.is_some() {
6068                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6069                    }
6070                    if introspection_debugging.is_some() {
6071                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6072                    }
6073                    if introspection_interval.is_some() {
6074                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6075                    }
6076                    if size.is_some() {
6077                        sql_bail!("SIZE not supported for unmanaged clusters");
6078                    }
6079                    if disk.is_some() {
6080                        sql_bail!("DISK not supported for unmanaged clusters");
6081                    }
6082                    if schedule.is_some()
6083                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6084                    {
6085                        sql_bail!(
6086                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6087                        );
6088                    }
6089                    if let Some(current_schedule) = cluster.schedule() {
6090                        if !matches!(current_schedule, ClusterSchedule::Manual)
6091                            && schedule.is_none()
6092                        {
6093                            sql_bail!(
6094                                "when switching a cluster to unmanaged, if the managed \
6095                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6096                                explicitly set the SCHEDULE to MANUAL"
6097                            );
6098                        }
6099                    }
6100                }
6101            }
6102
6103            let mut replicas = vec![];
6104            for ReplicaDefinition { name, options } in
6105                replica_defs.into_iter().flat_map(Vec::into_iter)
6106            {
6107                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6108            }
6109
6110            if let Some(managed) = managed {
6111                options.managed = AlterOptionParameter::Set(managed);
6112            }
6113            if let Some(replication_factor) = replication_factor {
6114                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6115            }
6116            if let Some(size) = &size {
6117                options.size = AlterOptionParameter::Set(size.clone());
6118            }
6119            if let Some(availability_zones) = availability_zones {
6120                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6121            }
6122            if let Some(introspection_debugging) = introspection_debugging {
6123                options.introspection_debugging =
6124                    AlterOptionParameter::Set(introspection_debugging);
6125            }
6126            if let Some(introspection_interval) = introspection_interval {
6127                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6128            }
6129            if disk.is_some() {
6130                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6131                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6132                // we'll be able to remove the `DISK` option entirely.
6133                let size = size.as_deref().unwrap_or_else(|| {
6134                    cluster.managed_size().expect("cluster known to be managed")
6135                });
6136                if scx.catalog.is_cluster_size_cc(size) {
6137                    sql_bail!(
6138                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6139                    );
6140                }
6141
6142                scx.catalog
6143                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6144            }
6145            if !replicas.is_empty() {
6146                options.replicas = AlterOptionParameter::Set(replicas);
6147            }
6148            if let Some(schedule) = schedule {
6149                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6150            }
6151            if let Some(workload_class) = workload_class {
6152                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6153            }
6154        }
6155        AlterClusterAction::ResetOptions(reset_options) => {
6156            use AlterOptionParameter::Reset;
6157            use ClusterOptionName::*;
6158
6159            if !scx.catalog.active_role_id().is_system() {
6160                if reset_options.contains(&WorkloadClass) {
6161                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6162                }
6163            }
6164
6165            for option in reset_options {
6166                match option {
6167                    AvailabilityZones => options.availability_zones = Reset,
6168                    Disk => scx
6169                        .catalog
6170                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6171                    IntrospectionInterval => options.introspection_interval = Reset,
6172                    IntrospectionDebugging => options.introspection_debugging = Reset,
6173                    Managed => options.managed = Reset,
6174                    Replicas => options.replicas = Reset,
6175                    ReplicationFactor => options.replication_factor = Reset,
6176                    Size => options.size = Reset,
6177                    Schedule => options.schedule = Reset,
6178                    WorkloadClass => options.workload_class = Reset,
6179                }
6180            }
6181        }
6182    }
6183    Ok(Plan::AlterCluster(AlterClusterPlan {
6184        id: cluster.id(),
6185        name: cluster.name().to_string(),
6186        options,
6187        strategy: alter_strategy,
6188    }))
6189}
6190
6191pub fn describe_alter_set_cluster(
6192    _: &StatementContext,
6193    _: AlterSetClusterStatement<Aug>,
6194) -> Result<StatementDesc, PlanError> {
6195    Ok(StatementDesc::new(None))
6196}
6197
6198pub fn plan_alter_item_set_cluster(
6199    scx: &StatementContext,
6200    AlterSetClusterStatement {
6201        if_exists,
6202        set_cluster: in_cluster_name,
6203        name,
6204        object_type,
6205    }: AlterSetClusterStatement<Aug>,
6206) -> Result<Plan, PlanError> {
6207    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6208
6209    let object_type = object_type.into();
6210
6211    // Prevent access to `SET CLUSTER` for unsupported objects.
6212    match object_type {
6213        ObjectType::MaterializedView => {}
6214        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6215            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6216        }
6217        _ => {
6218            bail_never_supported!(
6219                format!("ALTER {object_type} SET CLUSTER"),
6220                "sql/alter-set-cluster/",
6221                format!("{object_type} has no associated cluster")
6222            )
6223        }
6224    }
6225
6226    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6227
6228    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6229        Some(entry) => {
6230            let current_cluster = entry.cluster_id();
6231            let Some(current_cluster) = current_cluster else {
6232                sql_bail!("No cluster associated with {name}");
6233            };
6234
6235            if current_cluster == in_cluster.id() {
6236                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6237            } else {
6238                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6239                    id: entry.id(),
6240                    set_cluster: in_cluster.id(),
6241                }))
6242            }
6243        }
6244        None => {
6245            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6246                name: name.to_ast_string_simple(),
6247                object_type,
6248            });
6249
6250            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6251        }
6252    }
6253}
6254
6255pub fn describe_alter_object_rename(
6256    _: &StatementContext,
6257    _: AlterObjectRenameStatement,
6258) -> Result<StatementDesc, PlanError> {
6259    Ok(StatementDesc::new(None))
6260}
6261
6262pub fn plan_alter_object_rename(
6263    scx: &mut StatementContext,
6264    AlterObjectRenameStatement {
6265        name,
6266        object_type,
6267        to_item_name,
6268        if_exists,
6269    }: AlterObjectRenameStatement,
6270) -> Result<Plan, PlanError> {
6271    let object_type = object_type.into();
6272    match (object_type, name) {
6273        (
6274            ObjectType::View
6275            | ObjectType::MaterializedView
6276            | ObjectType::Table
6277            | ObjectType::Source
6278            | ObjectType::Index
6279            | ObjectType::Sink
6280            | ObjectType::Secret
6281            | ObjectType::Connection,
6282            UnresolvedObjectName::Item(name),
6283        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6284        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6285            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6286        }
6287        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6288            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6289        }
6290        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6291            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6292        }
6293        (object_type, name) => {
6294            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6295        }
6296    }
6297}
6298
6299pub fn plan_alter_schema_rename(
6300    scx: &mut StatementContext,
6301    name: UnresolvedSchemaName,
6302    to_schema_name: Ident,
6303    if_exists: bool,
6304) -> Result<Plan, PlanError> {
6305    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6306        let object_type = ObjectType::Schema;
6307        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6308            name: name.to_ast_string_simple(),
6309            object_type,
6310        });
6311        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6312    };
6313
6314    // Make sure the name is unique.
6315    if scx
6316        .resolve_schema_in_database(&db_spec, &to_schema_name)
6317        .is_ok()
6318    {
6319        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6320            to_schema_name.clone().into_string(),
6321        )));
6322    }
6323
6324    // Prevent users from renaming system related schemas.
6325    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6326    if schema.id().is_system() {
6327        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6328    }
6329
6330    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6331        cur_schema_spec: (db_spec, schema_spec),
6332        new_schema_name: to_schema_name.into_string(),
6333    }))
6334}
6335
6336pub fn plan_alter_schema_swap<F>(
6337    scx: &mut StatementContext,
6338    name_a: UnresolvedSchemaName,
6339    name_b: Ident,
6340    gen_temp_suffix: F,
6341) -> Result<Plan, PlanError>
6342where
6343    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6344{
6345    let schema_a = scx.resolve_schema(name_a.clone())?;
6346
6347    let db_spec = schema_a.database().clone();
6348    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6349        sql_bail!("cannot swap schemas that are in the ambient database");
6350    };
6351    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6352
6353    // We cannot swap system schemas.
6354    if schema_a.id().is_system() || schema_b.id().is_system() {
6355        bail_never_supported!("swapping a system schema".to_string())
6356    }
6357
6358    // Generate a temporary name we can swap schema_a to.
6359    //
6360    // 'check' returns if the temp schema name would be valid.
6361    let check = |temp_suffix: &str| {
6362        let mut temp_name = ident!("mz_schema_swap_");
6363        temp_name.append_lossy(temp_suffix);
6364        scx.resolve_schema_in_database(&db_spec, &temp_name)
6365            .is_err()
6366    };
6367    let temp_suffix = gen_temp_suffix(&check)?;
6368    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6369
6370    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6371        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6372        schema_a_name: schema_a.name().schema.to_string(),
6373        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6374        schema_b_name: schema_b.name().schema.to_string(),
6375        name_temp,
6376    }))
6377}
6378
6379pub fn plan_alter_item_rename(
6380    scx: &mut StatementContext,
6381    object_type: ObjectType,
6382    name: UnresolvedItemName,
6383    to_item_name: Ident,
6384    if_exists: bool,
6385) -> Result<Plan, PlanError> {
6386    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6387        Ok(r) => r,
6388        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6389        Err(PlanError::MismatchedObjectType {
6390            name,
6391            is_type: ObjectType::MaterializedView,
6392            expected_type: ObjectType::View,
6393        }) => {
6394            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6395        }
6396        e => e?,
6397    };
6398
6399    match resolved {
6400        Some(entry) => {
6401            let full_name = scx.catalog.resolve_full_name(entry.name());
6402            let item_type = entry.item_type();
6403
6404            let proposed_name = QualifiedItemName {
6405                qualifiers: entry.name().qualifiers.clone(),
6406                item: to_item_name.clone().into_string(),
6407            };
6408
6409            // For PostgreSQL compatibility, items and types cannot have
6410            // overlapping names in a variety of situations. See the comment on
6411            // `CatalogItemType::conflicts_with_type` for details.
6412            let conflicting_type_exists;
6413            let conflicting_item_exists;
6414            if item_type == CatalogItemType::Type {
6415                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6416                conflicting_item_exists = scx
6417                    .catalog
6418                    .get_item_by_name(&proposed_name)
6419                    .map(|item| item.item_type().conflicts_with_type())
6420                    .unwrap_or(false);
6421            } else {
6422                conflicting_type_exists = item_type.conflicts_with_type()
6423                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6424                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6425            };
6426            if conflicting_type_exists || conflicting_item_exists {
6427                sql_bail!("catalog item '{}' already exists", to_item_name);
6428            }
6429
6430            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6431                id: entry.id(),
6432                current_full_name: full_name,
6433                to_name: normalize::ident(to_item_name),
6434                object_type,
6435            }))
6436        }
6437        None => {
6438            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6439                name: name.to_ast_string_simple(),
6440                object_type,
6441            });
6442
6443            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6444        }
6445    }
6446}
6447
6448pub fn plan_alter_cluster_rename(
6449    scx: &mut StatementContext,
6450    object_type: ObjectType,
6451    name: Ident,
6452    to_name: Ident,
6453    if_exists: bool,
6454) -> Result<Plan, PlanError> {
6455    match resolve_cluster(scx, &name, if_exists)? {
6456        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6457            id: entry.id(),
6458            name: entry.name().to_string(),
6459            to_name: ident(to_name),
6460        })),
6461        None => {
6462            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6463                name: name.to_ast_string_simple(),
6464                object_type,
6465            });
6466
6467            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6468        }
6469    }
6470}
6471
6472pub fn plan_alter_cluster_swap<F>(
6473    scx: &mut StatementContext,
6474    name_a: Ident,
6475    name_b: Ident,
6476    gen_temp_suffix: F,
6477) -> Result<Plan, PlanError>
6478where
6479    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6480{
6481    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6482    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6483
6484    let check = |temp_suffix: &str| {
6485        let mut temp_name = ident!("mz_schema_swap_");
6486        temp_name.append_lossy(temp_suffix);
6487        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6488            // Temp name does not exist, so we can use it.
6489            Err(CatalogError::UnknownCluster(_)) => true,
6490            // Temp name already exists!
6491            Ok(_) | Err(_) => false,
6492        }
6493    };
6494    let temp_suffix = gen_temp_suffix(&check)?;
6495    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6496
6497    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6498        id_a: cluster_a.id(),
6499        id_b: cluster_b.id(),
6500        name_a: name_a.into_string(),
6501        name_b: name_b.into_string(),
6502        name_temp,
6503    }))
6504}
6505
6506pub fn plan_alter_cluster_replica_rename(
6507    scx: &mut StatementContext,
6508    object_type: ObjectType,
6509    name: QualifiedReplica,
6510    to_item_name: Ident,
6511    if_exists: bool,
6512) -> Result<Plan, PlanError> {
6513    match resolve_cluster_replica(scx, &name, if_exists)? {
6514        Some((cluster, replica)) => {
6515            ensure_cluster_is_not_managed(scx, cluster.id())?;
6516            Ok(Plan::AlterClusterReplicaRename(
6517                AlterClusterReplicaRenamePlan {
6518                    cluster_id: cluster.id(),
6519                    replica_id: replica,
6520                    name: QualifiedReplica {
6521                        cluster: Ident::new(cluster.name())?,
6522                        replica: name.replica,
6523                    },
6524                    to_name: normalize::ident(to_item_name),
6525                },
6526            ))
6527        }
6528        None => {
6529            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6530                name: name.to_ast_string_simple(),
6531                object_type,
6532            });
6533
6534            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6535        }
6536    }
6537}
6538
6539pub fn describe_alter_object_swap(
6540    _: &StatementContext,
6541    _: AlterObjectSwapStatement,
6542) -> Result<StatementDesc, PlanError> {
6543    Ok(StatementDesc::new(None))
6544}
6545
6546pub fn plan_alter_object_swap(
6547    scx: &mut StatementContext,
6548    stmt: AlterObjectSwapStatement,
6549) -> Result<Plan, PlanError> {
6550    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6551
6552    let AlterObjectSwapStatement {
6553        object_type,
6554        name_a,
6555        name_b,
6556    } = stmt;
6557    let object_type = object_type.into();
6558
6559    // We'll try 10 times to generate a temporary suffix.
6560    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6561        let mut attempts = 0;
6562        let name_temp = loop {
6563            attempts += 1;
6564            if attempts > 10 {
6565                tracing::warn!("Unable to generate temp id for swapping");
6566                sql_bail!("unable to swap!");
6567            }
6568
6569            // Call the provided closure to make sure this name is unique!
6570            let short_id = mz_ore::id_gen::temp_id();
6571            if check_fn(&short_id) {
6572                break short_id;
6573            }
6574        };
6575
6576        Ok(name_temp)
6577    };
6578
6579    match (object_type, name_a, name_b) {
6580        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6581            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6582        }
6583        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6584            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6585        }
6586        (object_type, _, _) => Err(PlanError::Unsupported {
6587            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6588            discussion_no: None,
6589        }),
6590    }
6591}
6592
6593pub fn describe_alter_retain_history(
6594    _: &StatementContext,
6595    _: AlterRetainHistoryStatement<Aug>,
6596) -> Result<StatementDesc, PlanError> {
6597    Ok(StatementDesc::new(None))
6598}
6599
6600pub fn plan_alter_retain_history(
6601    scx: &StatementContext,
6602    AlterRetainHistoryStatement {
6603        object_type,
6604        if_exists,
6605        name,
6606        history,
6607    }: AlterRetainHistoryStatement<Aug>,
6608) -> Result<Plan, PlanError> {
6609    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6610}
6611
6612fn alter_retain_history(
6613    scx: &StatementContext,
6614    object_type: ObjectType,
6615    if_exists: bool,
6616    name: UnresolvedObjectName,
6617    history: Option<WithOptionValue<Aug>>,
6618) -> Result<Plan, PlanError> {
6619    let name = match (object_type, name) {
6620        (
6621            // View gets a special error below.
6622            ObjectType::View
6623            | ObjectType::MaterializedView
6624            | ObjectType::Table
6625            | ObjectType::Source
6626            | ObjectType::Index,
6627            UnresolvedObjectName::Item(name),
6628        ) => name,
6629        (object_type, _) => {
6630            sql_bail!("{object_type} does not support RETAIN HISTORY")
6631        }
6632    };
6633    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6634        Some(entry) => {
6635            let full_name = scx.catalog.resolve_full_name(entry.name());
6636            let item_type = entry.item_type();
6637
6638            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6639            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6640                return Err(PlanError::AlterViewOnMaterializedView(
6641                    full_name.to_string(),
6642                ));
6643            } else if object_type == ObjectType::View {
6644                sql_bail!("{object_type} does not support RETAIN HISTORY")
6645            } else if object_type != item_type {
6646                sql_bail!(
6647                    "\"{}\" is a {} not a {}",
6648                    full_name,
6649                    entry.item_type(),
6650                    format!("{object_type}").to_lowercase()
6651                )
6652            }
6653
6654            // Save the original value so we can write it back down in the create_sql catalog item.
6655            let (value, lcw) = match &history {
6656                Some(WithOptionValue::RetainHistoryFor(value)) => {
6657                    let window = OptionalDuration::try_from_value(value.clone())?;
6658                    (Some(value.clone()), window.0)
6659                }
6660                // None is RESET, so use the default CW.
6661                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6662                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6663            };
6664            let window = plan_retain_history(scx, lcw)?;
6665
6666            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6667                id: entry.id(),
6668                value,
6669                window,
6670                object_type,
6671            }))
6672        }
6673        None => {
6674            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6675                name: name.to_ast_string_simple(),
6676                object_type,
6677            });
6678
6679            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6680        }
6681    }
6682}
6683
6684pub fn describe_alter_secret_options(
6685    _: &StatementContext,
6686    _: AlterSecretStatement<Aug>,
6687) -> Result<StatementDesc, PlanError> {
6688    Ok(StatementDesc::new(None))
6689}
6690
6691pub fn plan_alter_secret(
6692    scx: &mut StatementContext,
6693    stmt: AlterSecretStatement<Aug>,
6694) -> Result<Plan, PlanError> {
6695    let AlterSecretStatement {
6696        name,
6697        if_exists,
6698        value,
6699    } = stmt;
6700    let object_type = ObjectType::Secret;
6701    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6702        Some(entry) => entry.id(),
6703        None => {
6704            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6705                name: name.to_string(),
6706                object_type,
6707            });
6708
6709            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6710        }
6711    };
6712
6713    let secret_as = query::plan_secret_as(scx, value)?;
6714
6715    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6716}
6717
6718pub fn describe_alter_connection(
6719    _: &StatementContext,
6720    _: AlterConnectionStatement<Aug>,
6721) -> Result<StatementDesc, PlanError> {
6722    Ok(StatementDesc::new(None))
6723}
6724
6725generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6726
6727pub fn plan_alter_connection(
6728    scx: &StatementContext,
6729    stmt: AlterConnectionStatement<Aug>,
6730) -> Result<Plan, PlanError> {
6731    let AlterConnectionStatement {
6732        name,
6733        if_exists,
6734        actions,
6735        with_options,
6736    } = stmt;
6737    let conn_name = normalize::unresolved_item_name(name)?;
6738    let entry = match scx.catalog.resolve_item(&conn_name) {
6739        Ok(entry) => entry,
6740        Err(_) if if_exists => {
6741            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6742                name: conn_name.to_string(),
6743                object_type: ObjectType::Sink,
6744            });
6745
6746            return Ok(Plan::AlterNoop(AlterNoopPlan {
6747                object_type: ObjectType::Connection,
6748            }));
6749        }
6750        Err(e) => return Err(e.into()),
6751    };
6752
6753    let connection = entry.connection()?;
6754
6755    if actions
6756        .iter()
6757        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6758    {
6759        if actions.len() > 1 {
6760            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6761        }
6762
6763        if !with_options.is_empty() {
6764            sql_bail!(
6765                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6766                with_options
6767                    .iter()
6768                    .map(|o| o.to_ast_string_simple())
6769                    .join(", ")
6770            );
6771        }
6772
6773        if !matches!(connection, Connection::Ssh(_)) {
6774            sql_bail!(
6775                "{} is not an SSH connection",
6776                scx.catalog.resolve_full_name(entry.name())
6777            )
6778        }
6779
6780        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6781            id: entry.id(),
6782            action: crate::plan::AlterConnectionAction::RotateKeys,
6783        }));
6784    }
6785
6786    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6787    if options.validate.is_some() {
6788        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6789    }
6790
6791    let validate = match options.validate {
6792        Some(val) => val,
6793        None => {
6794            scx.catalog
6795                .system_vars()
6796                .enable_default_connection_validation()
6797                && connection.validate_by_default()
6798        }
6799    };
6800
6801    let connection_type = match connection {
6802        Connection::Aws(_) => CreateConnectionType::Aws,
6803        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6804        Connection::Kafka(_) => CreateConnectionType::Kafka,
6805        Connection::Csr(_) => CreateConnectionType::Csr,
6806        Connection::Postgres(_) => CreateConnectionType::Postgres,
6807        Connection::Ssh(_) => CreateConnectionType::Ssh,
6808        Connection::MySql(_) => CreateConnectionType::MySql,
6809        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6810        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6811    };
6812
6813    // Collect all options irrespective of action taken on them.
6814    let specified_options: BTreeSet<_> = actions
6815        .iter()
6816        .map(|action: &AlterConnectionAction<Aug>| match action {
6817            AlterConnectionAction::SetOption(option) => option.name.clone(),
6818            AlterConnectionAction::DropOption(name) => name.clone(),
6819            AlterConnectionAction::RotateKeys => unreachable!(),
6820        })
6821        .collect();
6822
6823    for invalid in INALTERABLE_OPTIONS {
6824        if specified_options.contains(invalid) {
6825            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6826        }
6827    }
6828
6829    connection::validate_options_per_connection_type(connection_type, specified_options)?;
6830
6831    // Partition operations into set and drop
6832    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6833        actions.into_iter().partition_map(|action| match action {
6834            AlterConnectionAction::SetOption(option) => Either::Left(option),
6835            AlterConnectionAction::DropOption(name) => Either::Right(name),
6836            AlterConnectionAction::RotateKeys => unreachable!(),
6837        });
6838
6839    let set_options: BTreeMap<_, _> = set_options_vec
6840        .clone()
6841        .into_iter()
6842        .map(|option| (option.name, option.value))
6843        .collect();
6844
6845    // Type check values + avoid duplicates; we don't want to e.g. let users
6846    // drop and set the same option in the same statement, so treating drops as
6847    // sets here is fine.
6848    let connection_options_extracted =
6849        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6850
6851    let duplicates: Vec<_> = connection_options_extracted
6852        .seen
6853        .intersection(&drop_options)
6854        .collect();
6855
6856    if !duplicates.is_empty() {
6857        sql_bail!(
6858            "cannot both SET and DROP/RESET options {}",
6859            duplicates
6860                .iter()
6861                .map(|option| option.to_string())
6862                .join(", ")
6863        )
6864    }
6865
6866    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6867        let set_options_count = mutually_exclusive_options
6868            .iter()
6869            .filter(|o| set_options.contains_key(o))
6870            .count();
6871        let drop_options_count = mutually_exclusive_options
6872            .iter()
6873            .filter(|o| drop_options.contains(o))
6874            .count();
6875
6876        // Disallow setting _and_ resetting mutually exclusive options
6877        if set_options_count > 0 && drop_options_count > 0 {
6878            sql_bail!(
6879                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6880                connection_type,
6881                mutually_exclusive_options
6882                    .iter()
6883                    .map(|option| option.to_string())
6884                    .join(", ")
6885            )
6886        }
6887
6888        // If any option is either set or dropped, ensure all mutually exclusive
6889        // options are dropped. We do this "behind the scenes", even though we
6890        // disallow users from performing the same action because this is the
6891        // mechanism by which we overwrite values elsewhere in the code.
6892        if set_options_count > 0 || drop_options_count > 0 {
6893            drop_options.extend(mutually_exclusive_options.iter().cloned());
6894        }
6895
6896        // n.b. if mutually exclusive options are set, those will error when we
6897        // try to replan the connection.
6898    }
6899
6900    Ok(Plan::AlterConnection(AlterConnectionPlan {
6901        id: entry.id(),
6902        action: crate::plan::AlterConnectionAction::AlterOptions {
6903            set_options,
6904            drop_options,
6905            validate,
6906        },
6907    }))
6908}
6909
6910pub fn describe_alter_sink(
6911    _: &StatementContext,
6912    _: AlterSinkStatement<Aug>,
6913) -> Result<StatementDesc, PlanError> {
6914    Ok(StatementDesc::new(None))
6915}
6916
6917pub fn plan_alter_sink(
6918    scx: &mut StatementContext,
6919    stmt: AlterSinkStatement<Aug>,
6920) -> Result<Plan, PlanError> {
6921    let AlterSinkStatement {
6922        sink_name,
6923        if_exists,
6924        action,
6925    } = stmt;
6926
6927    let object_type = ObjectType::Sink;
6928    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6929
6930    let Some(item) = item else {
6931        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6932            name: sink_name.to_string(),
6933            object_type,
6934        });
6935
6936        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6937    };
6938    // Always ALTER objects from their latest version.
6939    let item = item.at_version(RelationVersionSelector::Latest);
6940
6941    match action {
6942        AlterSinkAction::ChangeRelation(new_from) => {
6943            // First we reconstruct the original CREATE SINK statement
6944            let create_sql = item.create_sql();
6945            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6946            let [stmt]: [StatementParseResult; 1] = stmts
6947                .try_into()
6948                .expect("create sql of sink was not exactly one statement");
6949            let Statement::CreateSink(mut stmt) = stmt.ast else {
6950                unreachable!("invalid create SQL for sink item");
6951            };
6952
6953            // And then we find the existing version of the sink and increase it by one
6954            let cur_version = stmt
6955                .with_options
6956                .extract_if(.., |o| o.name == CreateSinkOptionName::Version)
6957                .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
6958                .max()
6959                .unwrap_or(0);
6960            let new_version = cur_version + 1;
6961            stmt.with_options.push(CreateSinkOption {
6962                name: CreateSinkOptionName::Version,
6963                value: Some(WithOptionValue::Value(Value::Number(
6964                    new_version.to_string(),
6965                ))),
6966            });
6967
6968            // Then resolve and swap the resolved from relation to the new one
6969            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
6970            stmt.from = new_from;
6971
6972            // Finally re-plan the modified create sink statement to verify the new configuration is valid
6973            let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
6974                unreachable!("invalid plan for CREATE SINK statement");
6975            };
6976
6977            Ok(Plan::AlterSink(AlterSinkPlan {
6978                item_id: item.id(),
6979                global_id: item.global_id(),
6980                sink: plan.sink,
6981                with_snapshot: plan.with_snapshot,
6982                in_cluster: plan.in_cluster,
6983            }))
6984        }
6985        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
6986        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
6987    }
6988}
6989
6990pub fn describe_alter_source(
6991    _: &StatementContext,
6992    _: AlterSourceStatement<Aug>,
6993) -> Result<StatementDesc, PlanError> {
6994    // TODO: put the options here, right?
6995    Ok(StatementDesc::new(None))
6996}
6997
6998generate_extracted_config!(
6999    AlterSourceAddSubsourceOption,
7000    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7001    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7002    (Details, String)
7003);
7004
7005pub fn plan_alter_source(
7006    scx: &mut StatementContext,
7007    stmt: AlterSourceStatement<Aug>,
7008) -> Result<Plan, PlanError> {
7009    let AlterSourceStatement {
7010        source_name,
7011        if_exists,
7012        action,
7013    } = stmt;
7014    let object_type = ObjectType::Source;
7015
7016    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7017        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7018            name: source_name.to_string(),
7019            object_type,
7020        });
7021
7022        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7023    }
7024
7025    match action {
7026        AlterSourceAction::SetOptions(options) => {
7027            let mut options = options.into_iter();
7028            let option = options.next().unwrap();
7029            if option.name == CreateSourceOptionName::RetainHistory {
7030                if options.next().is_some() {
7031                    sql_bail!("RETAIN HISTORY must be only option");
7032                }
7033                return alter_retain_history(
7034                    scx,
7035                    object_type,
7036                    if_exists,
7037                    UnresolvedObjectName::Item(source_name),
7038                    option.value,
7039                );
7040            }
7041            // n.b we use this statement in purification in a way that cannot be
7042            // planned directly.
7043            sql_bail!(
7044                "Cannot modify the {} of a SOURCE.",
7045                option.name.to_ast_string_simple()
7046            );
7047        }
7048        AlterSourceAction::ResetOptions(reset) => {
7049            let mut options = reset.into_iter();
7050            let option = options.next().unwrap();
7051            if option == CreateSourceOptionName::RetainHistory {
7052                if options.next().is_some() {
7053                    sql_bail!("RETAIN HISTORY must be only option");
7054                }
7055                return alter_retain_history(
7056                    scx,
7057                    object_type,
7058                    if_exists,
7059                    UnresolvedObjectName::Item(source_name),
7060                    None,
7061                );
7062            }
7063            sql_bail!(
7064                "Cannot modify the {} of a SOURCE.",
7065                option.to_ast_string_simple()
7066            );
7067        }
7068        AlterSourceAction::DropSubsources { .. } => {
7069            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7070        }
7071        AlterSourceAction::AddSubsources { .. } => {
7072            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7073        }
7074        AlterSourceAction::RefreshReferences => {
7075            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7076        }
7077    };
7078}
7079
7080pub fn describe_alter_system_set(
7081    _: &StatementContext,
7082    _: AlterSystemSetStatement,
7083) -> Result<StatementDesc, PlanError> {
7084    Ok(StatementDesc::new(None))
7085}
7086
7087pub fn plan_alter_system_set(
7088    _: &StatementContext,
7089    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7090) -> Result<Plan, PlanError> {
7091    let name = name.to_string();
7092    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7093        name,
7094        value: scl::plan_set_variable_to(to)?,
7095    }))
7096}
7097
7098pub fn describe_alter_system_reset(
7099    _: &StatementContext,
7100    _: AlterSystemResetStatement,
7101) -> Result<StatementDesc, PlanError> {
7102    Ok(StatementDesc::new(None))
7103}
7104
7105pub fn plan_alter_system_reset(
7106    _: &StatementContext,
7107    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7108) -> Result<Plan, PlanError> {
7109    let name = name.to_string();
7110    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7111}
7112
7113pub fn describe_alter_system_reset_all(
7114    _: &StatementContext,
7115    _: AlterSystemResetAllStatement,
7116) -> Result<StatementDesc, PlanError> {
7117    Ok(StatementDesc::new(None))
7118}
7119
7120pub fn plan_alter_system_reset_all(
7121    _: &StatementContext,
7122    _: AlterSystemResetAllStatement,
7123) -> Result<Plan, PlanError> {
7124    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7125}
7126
7127pub fn describe_alter_role(
7128    _: &StatementContext,
7129    _: AlterRoleStatement<Aug>,
7130) -> Result<StatementDesc, PlanError> {
7131    Ok(StatementDesc::new(None))
7132}
7133
7134pub fn plan_alter_role(
7135    _scx: &StatementContext,
7136    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7137) -> Result<Plan, PlanError> {
7138    let option = match option {
7139        AlterRoleOption::Attributes(attrs) => {
7140            let attrs = plan_role_attributes(attrs)?;
7141            PlannedAlterRoleOption::Attributes(attrs)
7142        }
7143        AlterRoleOption::Variable(variable) => {
7144            let var = plan_role_variable(variable)?;
7145            PlannedAlterRoleOption::Variable(var)
7146        }
7147    };
7148
7149    Ok(Plan::AlterRole(AlterRolePlan {
7150        id: name.id,
7151        name: name.name,
7152        option,
7153    }))
7154}
7155
7156pub fn describe_alter_table_add_column(
7157    _: &StatementContext,
7158    _: AlterTableAddColumnStatement<Aug>,
7159) -> Result<StatementDesc, PlanError> {
7160    Ok(StatementDesc::new(None))
7161}
7162
7163pub fn plan_alter_table_add_column(
7164    scx: &StatementContext,
7165    stmt: AlterTableAddColumnStatement<Aug>,
7166) -> Result<Plan, PlanError> {
7167    let AlterTableAddColumnStatement {
7168        if_exists,
7169        name,
7170        if_col_not_exist,
7171        column_name,
7172        data_type,
7173    } = stmt;
7174    let object_type = ObjectType::Table;
7175
7176    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7177
7178    let (relation_id, item_name, desc) =
7179        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7180            Some(item) => {
7181                // Always add columns to the latest version of the item.
7182                let item_name = scx.catalog.resolve_full_name(item.name());
7183                let item = item.at_version(RelationVersionSelector::Latest);
7184                let desc = item.desc(&item_name)?.into_owned();
7185                (item.id(), item_name, desc)
7186            }
7187            None => {
7188                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7189                    name: name.to_ast_string_simple(),
7190                    object_type,
7191                });
7192                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7193            }
7194        };
7195
7196    let column_name = ColumnName::from(column_name.as_str());
7197    if desc.get_by_name(&column_name).is_some() {
7198        if if_col_not_exist {
7199            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7200                column_name: column_name.to_string(),
7201                object_name: item_name.item,
7202            });
7203            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7204        } else {
7205            return Err(PlanError::ColumnAlreadyExists {
7206                column_name,
7207                object_name: item_name.item,
7208            });
7209        }
7210    }
7211
7212    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7213    // TODO(alter_table): Support non-nullable columns with default values.
7214    let column_type = scalar_type.nullable(true);
7215    // "unresolve" our data type so we can later update the persisted create_sql.
7216    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7217
7218    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7219        relation_id,
7220        column_name,
7221        column_type,
7222        raw_sql_type,
7223    }))
7224}
7225
7226pub fn describe_comment(
7227    _: &StatementContext,
7228    _: CommentStatement<Aug>,
7229) -> Result<StatementDesc, PlanError> {
7230    Ok(StatementDesc::new(None))
7231}
7232
7233pub fn plan_comment(
7234    scx: &mut StatementContext,
7235    stmt: CommentStatement<Aug>,
7236) -> Result<Plan, PlanError> {
7237    const MAX_COMMENT_LENGTH: usize = 1024;
7238
7239    let CommentStatement { object, comment } = stmt;
7240
7241    // TODO(parkmycar): Make max comment length configurable.
7242    if let Some(c) = &comment {
7243        if c.len() > 1024 {
7244            return Err(PlanError::CommentTooLong {
7245                length: c.len(),
7246                max_size: MAX_COMMENT_LENGTH,
7247            });
7248        }
7249    }
7250
7251    let (object_id, column_pos) = match &object {
7252        com_ty @ CommentObjectType::Table { name }
7253        | com_ty @ CommentObjectType::View { name }
7254        | com_ty @ CommentObjectType::MaterializedView { name }
7255        | com_ty @ CommentObjectType::Index { name }
7256        | com_ty @ CommentObjectType::Func { name }
7257        | com_ty @ CommentObjectType::Connection { name }
7258        | com_ty @ CommentObjectType::Source { name }
7259        | com_ty @ CommentObjectType::Sink { name }
7260        | com_ty @ CommentObjectType::Secret { name }
7261        | com_ty @ CommentObjectType::ContinualTask { name } => {
7262            let item = scx.get_item_by_resolved_name(name)?;
7263            match (com_ty, item.item_type()) {
7264                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7265                    (CommentObjectId::Table(item.id()), None)
7266                }
7267                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7268                    (CommentObjectId::View(item.id()), None)
7269                }
7270                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7271                    (CommentObjectId::MaterializedView(item.id()), None)
7272                }
7273                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7274                    (CommentObjectId::Index(item.id()), None)
7275                }
7276                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7277                    (CommentObjectId::Func(item.id()), None)
7278                }
7279                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7280                    (CommentObjectId::Connection(item.id()), None)
7281                }
7282                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7283                    (CommentObjectId::Source(item.id()), None)
7284                }
7285                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7286                    (CommentObjectId::Sink(item.id()), None)
7287                }
7288                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7289                    (CommentObjectId::Secret(item.id()), None)
7290                }
7291                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7292                    (CommentObjectId::ContinualTask(item.id()), None)
7293                }
7294                (com_ty, cat_ty) => {
7295                    let expected_type = match com_ty {
7296                        CommentObjectType::Table { .. } => ObjectType::Table,
7297                        CommentObjectType::View { .. } => ObjectType::View,
7298                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7299                        CommentObjectType::Index { .. } => ObjectType::Index,
7300                        CommentObjectType::Func { .. } => ObjectType::Func,
7301                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7302                        CommentObjectType::Source { .. } => ObjectType::Source,
7303                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7304                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7305                        _ => unreachable!("these are the only types we match on"),
7306                    };
7307
7308                    return Err(PlanError::InvalidObjectType {
7309                        expected_type: SystemObjectType::Object(expected_type),
7310                        actual_type: SystemObjectType::Object(cat_ty.into()),
7311                        object_name: item.name().item.clone(),
7312                    });
7313                }
7314            }
7315        }
7316        CommentObjectType::Type { ty } => match ty {
7317            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7318                sql_bail!("cannot comment on anonymous list or map type");
7319            }
7320            ResolvedDataType::Named { id, modifiers, .. } => {
7321                if !modifiers.is_empty() {
7322                    sql_bail!("cannot comment on type with modifiers");
7323                }
7324                (CommentObjectId::Type(*id), None)
7325            }
7326            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7327        },
7328        CommentObjectType::Column { name } => {
7329            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7330            match item.item_type() {
7331                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7332                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7333                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7334                CatalogItemType::MaterializedView => {
7335                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7336                }
7337                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7338                r => {
7339                    return Err(PlanError::Unsupported {
7340                        feature: format!("Specifying comments on a column of {r}"),
7341                        discussion_no: None,
7342                    });
7343                }
7344            }
7345        }
7346        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7347        CommentObjectType::Database { name } => {
7348            (CommentObjectId::Database(*name.database_id()), None)
7349        }
7350        CommentObjectType::Schema { name } => (
7351            CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7352            None,
7353        ),
7354        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7355        CommentObjectType::ClusterReplica { name } => {
7356            let replica = scx.catalog.resolve_cluster_replica(name)?;
7357            (
7358                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7359                None,
7360            )
7361        }
7362        CommentObjectType::NetworkPolicy { name } => {
7363            (CommentObjectId::NetworkPolicy(name.id), None)
7364        }
7365    };
7366
7367    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7368    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7369    // it's the easiest place to raise an error.
7370    //
7371    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7372    if let Some(p) = column_pos {
7373        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7374            max_num_columns: MAX_NUM_COLUMNS,
7375            req_num_columns: p,
7376        })?;
7377    }
7378
7379    Ok(Plan::Comment(CommentPlan {
7380        object_id,
7381        sub_component: column_pos,
7382        comment,
7383    }))
7384}
7385
7386pub(crate) fn resolve_cluster<'a>(
7387    scx: &'a StatementContext,
7388    name: &'a Ident,
7389    if_exists: bool,
7390) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7391    match scx.resolve_cluster(Some(name)) {
7392        Ok(cluster) => Ok(Some(cluster)),
7393        Err(_) if if_exists => Ok(None),
7394        Err(e) => Err(e),
7395    }
7396}
7397
7398pub(crate) fn resolve_cluster_replica<'a>(
7399    scx: &'a StatementContext,
7400    name: &QualifiedReplica,
7401    if_exists: bool,
7402) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7403    match scx.resolve_cluster(Some(&name.cluster)) {
7404        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7405            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7406            None if if_exists => Ok(None),
7407            None => Err(sql_err!(
7408                "CLUSTER {} has no CLUSTER REPLICA named {}",
7409                cluster.name(),
7410                name.replica.as_str().quoted(),
7411            )),
7412        },
7413        Err(_) if if_exists => Ok(None),
7414        Err(e) => Err(e),
7415    }
7416}
7417
7418pub(crate) fn resolve_database<'a>(
7419    scx: &'a StatementContext,
7420    name: &'a UnresolvedDatabaseName,
7421    if_exists: bool,
7422) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7423    match scx.resolve_database(name) {
7424        Ok(database) => Ok(Some(database)),
7425        Err(_) if if_exists => Ok(None),
7426        Err(e) => Err(e),
7427    }
7428}
7429
7430pub(crate) fn resolve_schema<'a>(
7431    scx: &'a StatementContext,
7432    name: UnresolvedSchemaName,
7433    if_exists: bool,
7434) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7435    match scx.resolve_schema(name) {
7436        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7437        Err(_) if if_exists => Ok(None),
7438        Err(e) => Err(e),
7439    }
7440}
7441
7442pub(crate) fn resolve_network_policy<'a>(
7443    scx: &'a StatementContext,
7444    name: Ident,
7445    if_exists: bool,
7446) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7447    match scx.catalog.resolve_network_policy(&name.to_string()) {
7448        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7449            id: policy.id(),
7450            name: policy.name().to_string(),
7451        })),
7452        Err(_) if if_exists => Ok(None),
7453        Err(e) => Err(e.into()),
7454    }
7455}
7456
7457pub(crate) fn resolve_item_or_type<'a>(
7458    scx: &'a StatementContext,
7459    object_type: ObjectType,
7460    name: UnresolvedItemName,
7461    if_exists: bool,
7462) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7463    let name = normalize::unresolved_item_name(name)?;
7464    let catalog_item = match object_type {
7465        ObjectType::Type => scx.catalog.resolve_type(&name),
7466        _ => scx.catalog.resolve_item(&name),
7467    };
7468
7469    match catalog_item {
7470        Ok(item) => {
7471            let is_type = ObjectType::from(item.item_type());
7472            if object_type == is_type {
7473                Ok(Some(item))
7474            } else {
7475                Err(PlanError::MismatchedObjectType {
7476                    name: scx.catalog.minimal_qualification(item.name()),
7477                    is_type,
7478                    expected_type: object_type,
7479                })
7480            }
7481        }
7482        Err(_) if if_exists => Ok(None),
7483        Err(e) => Err(e.into()),
7484    }
7485}
7486
7487/// Returns an error if the given cluster is a managed cluster
7488fn ensure_cluster_is_not_managed(
7489    scx: &StatementContext,
7490    cluster_id: ClusterId,
7491) -> Result<(), PlanError> {
7492    let cluster = scx.catalog.get_cluster(cluster_id);
7493    if cluster.is_managed() {
7494        Err(PlanError::ManagedCluster {
7495            cluster_name: cluster.name().to_string(),
7496        })
7497    } else {
7498        Ok(())
7499    }
7500}