mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59    CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60    CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61    CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141    scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154    AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155    ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156    ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157    CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158    CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165    WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184/// Given a relation desc and a column list, checks that:
185/// - the column list is a prefix of the desc;
186/// - all the listed columns are types that have meaningful Persist-level ordering.
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188    if partition_by.len() > desc.len() {
189        tracing::error!(
190            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191            desc.len(),
192            partition_by.len()
193        );
194        partition_by.truncate(desc.len());
195    }
196
197    let desc_prefix = desc.iter().take(partition_by.len());
198    for (idx, ((desc_name, desc_type), partition_name)) in
199        desc_prefix.zip_eq(partition_by).enumerate()
200    {
201        let partition_name = normalize::column_name(partition_name);
202        if *desc_name != partition_name {
203            sql_bail!(
204                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205            );
206        }
207        if !preserves_order(&desc_type.scalar_type) {
208            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209        }
210    }
211    Ok(())
212}
213
214pub fn describe_create_database(
215    _: &StatementContext,
216    _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218    Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222    _: &StatementContext,
223    CreateDatabaseStatement {
224        name,
225        if_not_exists,
226    }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228    Ok(Plan::CreateDatabase(CreateDatabasePlan {
229        name: normalize::ident(name.0),
230        if_not_exists,
231    }))
232}
233
234pub fn describe_create_schema(
235    _: &StatementContext,
236    _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238    Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242    scx: &StatementContext,
243    CreateSchemaStatement {
244        mut name,
245        if_not_exists,
246    }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248    if name.0.len() > 2 {
249        sql_bail!("schema name {} has more than two components", name);
250    }
251    let schema_name = normalize::ident(
252        name.0
253            .pop()
254            .expect("names always have at least one component"),
255    );
256    let database_spec = match name.0.pop() {
257        None => match scx.catalog.active_database() {
258            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259            None => sql_bail!("no database specified and no active database"),
260        },
261        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263            Err(_) => sql_bail!("invalid database {}", n.as_str()),
264        },
265    };
266    Ok(Plan::CreateSchema(CreateSchemaPlan {
267        database_spec,
268        schema_name,
269        if_not_exists,
270    }))
271}
272
273pub fn describe_create_table(
274    _: &StatementContext,
275    _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277    Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281    scx: &StatementContext,
282    stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284    let CreateTableStatement {
285        name,
286        columns,
287        constraints,
288        if_not_exists,
289        temporary,
290        with_options,
291    } = &stmt;
292
293    let names: Vec<_> = columns
294        .iter()
295        .filter(|c| {
296            // This set of `names` is used to create the initial RelationDesc.
297            // Columns that have been added at later versions of the table will
298            // get added further below.
299            let is_versioned = c
300                .options
301                .iter()
302                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303            !is_versioned
304        })
305        .map(|c| normalize::column_name(c.name.clone()))
306        .collect();
307
308    if let Some(dup) = names.iter().duplicates().next() {
309        sql_bail!("column {} specified more than once", dup.quoted());
310    }
311
312    // Build initial relation type that handles declared data types
313    // and NOT NULL constraints.
314    let mut column_types = Vec::with_capacity(columns.len());
315    let mut defaults = Vec::with_capacity(columns.len());
316    let mut changes = BTreeMap::new();
317    let mut keys = Vec::new();
318
319    for (i, c) in columns.into_iter().enumerate() {
320        let aug_data_type = &c.data_type;
321        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322        let mut nullable = true;
323        let mut default = Expr::null();
324        let mut versioned = false;
325        for option in &c.options {
326            match &option.option {
327                ColumnOption::NotNull => nullable = false,
328                ColumnOption::Default(expr) => {
329                    // Ensure expression can be planned and yields the correct
330                    // type.
331                    let mut expr = expr.clone();
332                    transform_ast::transform(scx, &mut expr)?;
333                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
334                    default = expr.clone();
335                }
336                ColumnOption::Unique { is_primary } => {
337                    keys.push(vec![i]);
338                    if *is_primary {
339                        nullable = false;
340                    }
341                }
342                ColumnOption::Versioned { action, version } => {
343                    let version = RelationVersion::from(*version);
344                    versioned = true;
345
346                    let name = normalize::column_name(c.name.clone());
347                    let typ = ty.clone().nullable(nullable);
348
349                    changes.insert(version, (action.clone(), name, typ));
350                }
351                other => {
352                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353                }
354            }
355        }
356        // TODO(alter_table): This assumes all versioned columns are at the
357        // end. This will no longer be true when we support dropping columns.
358        if !versioned {
359            column_types.push(ty.nullable(nullable));
360        }
361        defaults.push(default);
362    }
363
364    let mut seen_primary = false;
365    'c: for constraint in constraints {
366        match constraint {
367            TableConstraint::Unique {
368                name: _,
369                columns,
370                is_primary,
371                nulls_not_distinct,
372            } => {
373                if seen_primary && *is_primary {
374                    sql_bail!(
375                        "multiple primary keys for table {} are not allowed",
376                        name.to_ast_string_stable()
377                    );
378                }
379                seen_primary = *is_primary || seen_primary;
380
381                let mut key = vec![];
382                for column in columns {
383                    let column = normalize::column_name(column.clone());
384                    match names.iter().position(|name| *name == column) {
385                        None => sql_bail!("unknown column in constraint: {}", column),
386                        Some(i) => {
387                            let nullable = &mut column_types[i].nullable;
388                            if *is_primary {
389                                if *nulls_not_distinct {
390                                    sql_bail!(
391                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392                                    );
393                                }
394
395                                *nullable = false;
396                            } else if !(*nulls_not_distinct || !*nullable) {
397                                // Non-primary key unique constraints are only keys if all of their
398                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
399                                break 'c;
400                            }
401
402                            key.push(i);
403                        }
404                    }
405                }
406
407                if *is_primary {
408                    keys.insert(0, key);
409                } else {
410                    keys.push(key);
411                }
412            }
413            TableConstraint::ForeignKey { .. } => {
414                // Foreign key constraints are not presently enforced. We allow
415                // them with feature flags for sqllogictest's sake.
416                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417            }
418            TableConstraint::Check { .. } => {
419                // Check constraints are not presently enforced. We allow them
420                // with feature flags for sqllogictest's sake.
421                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422            }
423        }
424    }
425
426    if !keys.is_empty() {
427        // Unique constraints are not presently enforced. We allow them with feature flags for
428        // sqllogictest's sake.
429        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430    }
431
432    let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434    let temporary = *temporary;
435    let name = if temporary {
436        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437    } else {
438        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    };
440
441    // Check for an object in the catalog with this same name
442    let full_name = scx.catalog.resolve_full_name(&name);
443    let partial_name = PartialItemName::from(full_name.clone());
444    // For PostgreSQL compatibility, we need to prevent creating tables when
445    // there is an existing object *or* type of the same name.
446    if let (false, Ok(item)) = (
447        if_not_exists,
448        scx.catalog.resolve_item_or_type(&partial_name),
449    ) {
450        return Err(PlanError::ItemAlreadyExists {
451            name: full_name.to_string(),
452            item_type: item.item_type(),
453        });
454    }
455
456    let desc = RelationDesc::new(typ, names);
457    let mut desc = VersionedRelationDesc::new(desc);
458    for (version, (_action, name, typ)) in changes.into_iter() {
459        let new_version = desc.add_column(name, typ);
460        if version != new_version {
461            return Err(PlanError::InvalidTable {
462                name: full_name.item,
463            });
464        }
465    }
466
467    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469    // Table options should only consider the original columns, since those
470    // were the only ones in scope when the table was created.
471    //
472    // TODO(alter_table): Will need to reconsider this when we support ALTERing
473    // the PARTITION BY columns.
474    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477    let compaction_window = options.iter().find_map(|o| {
478        #[allow(irrefutable_let_patterns)]
479        if let crate::plan::TableOption::RetainHistory(lcw) = o {
480            Some(lcw.clone())
481        } else {
482            None
483        }
484    });
485
486    let table = Table {
487        create_sql,
488        desc,
489        temporary,
490        compaction_window,
491        data_source: TableDataSource::TableWrites { defaults },
492    };
493    Ok(Plan::CreateTable(CreateTablePlan {
494        name,
495        table,
496        if_not_exists: *if_not_exists,
497    }))
498}
499
500pub fn describe_create_table_from_source(
501    _: &StatementContext,
502    _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504    Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508    _: &StatementContext,
509    _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511    Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515    _: &StatementContext,
516    _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518    Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522    _: &StatementContext,
523    _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525    Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529    CreateSourceOption,
530    (TimestampInterval, Duration),
531    (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535    PgConfigOption,
536    (Details, String),
537    (Publication, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543    MySqlConfigOption,
544    (Details, String),
545    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550    SqlServerConfigOption,
551    (Details, String),
552    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557    scx: &StatementContext,
558    mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560    if stmt.is_table {
561        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562    }
563
564    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
565    // we plan to normalize when we canonicalize the create statement.
566    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567    let enable_multi_replica_sources =
568        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569    if !enable_multi_replica_sources {
570        if in_cluster.replica_ids().len() > 1 {
571            sql_bail!("cannot create webhook source in cluster with more than one replica")
572        }
573    }
574    let create_sql =
575        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577    let CreateWebhookSourceStatement {
578        name,
579        if_not_exists,
580        body_format,
581        include_headers,
582        validate_using,
583        is_table,
584        // We resolved `in_cluster` above, so we want to ignore it here.
585        in_cluster: _,
586    } = stmt;
587
588    let validate_using = validate_using
589        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590        .transpose()?;
591    if let Some(WebhookValidation { expression, .. }) = &validate_using {
592        // If the validation expression doesn't reference any part of the request, then we should
593        // return an error because it's almost definitely wrong.
594        if !expression.contains_column() {
595            return Err(PlanError::WebhookValidationDoesNotUseColumns);
596        }
597        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
598        // allow calls to `now()` because some webhook providers recommend rejecting requests that
599        // are older than a certain threshold.
600        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601            return Err(PlanError::WebhookValidationNonDeterministic);
602        }
603    }
604
605    let body_format = match body_format {
606        Format::Bytes => WebhookBodyFormat::Bytes,
607        Format::Json { array } => WebhookBodyFormat::Json { array },
608        Format::Text => WebhookBodyFormat::Text,
609        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
610        ty => {
611            return Err(PlanError::Unsupported {
612                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613                discussion_no: None,
614            });
615        }
616    };
617
618    let mut column_ty = vec![
619        // Always include the body of the request as the first column.
620        SqlColumnType {
621            scalar_type: SqlScalarType::from(body_format),
622            nullable: false,
623        },
624    ];
625    let mut column_names = vec!["body".to_string()];
626
627    let mut headers = WebhookHeaders::default();
628
629    // Include a `headers` column, possibly filtered.
630    if let Some(filters) = include_headers.column {
631        column_ty.push(SqlColumnType {
632            scalar_type: SqlScalarType::Map {
633                value_type: Box::new(SqlScalarType::String),
634                custom_id: None,
635            },
636            nullable: false,
637        });
638        column_names.push("headers".to_string());
639
640        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641            filters.into_iter().partition_map(|filter| {
642                if filter.block {
643                    itertools::Either::Right(filter.header_name)
644                } else {
645                    itertools::Either::Left(filter.header_name)
646                }
647            });
648        headers.header_column = Some(WebhookHeaderFilters { allow, block });
649    }
650
651    // Map headers to specific columns.
652    for header in include_headers.mappings {
653        let scalar_type = header
654            .use_bytes
655            .then_some(SqlScalarType::Bytes)
656            .unwrap_or(SqlScalarType::String);
657        column_ty.push(SqlColumnType {
658            scalar_type,
659            nullable: true,
660        });
661        column_names.push(header.column_name.into_string());
662
663        let column_idx = column_ty.len() - 1;
664        // Double check we're consistent with column names.
665        assert_eq!(
666            column_idx,
667            column_names.len() - 1,
668            "header column names and types don't match"
669        );
670        headers
671            .mapped_headers
672            .insert(column_idx, (header.header_name, header.use_bytes));
673    }
674
675    // Validate our columns.
676    let mut unique_check = HashSet::with_capacity(column_names.len());
677    for name in &column_names {
678        if !unique_check.insert(name) {
679            return Err(PlanError::AmbiguousColumn(name.clone().into()));
680        }
681    }
682    if column_names.len() > MAX_NUM_COLUMNS {
683        return Err(PlanError::TooManyColumns {
684            max_num_columns: MAX_NUM_COLUMNS,
685            req_num_columns: column_names.len(),
686        });
687    }
688
689    let typ = SqlRelationType::new(column_ty);
690    let desc = RelationDesc::new(typ, column_names);
691
692    // Check for an object in the catalog with this same name
693    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694    let full_name = scx.catalog.resolve_full_name(&name);
695    let partial_name = PartialItemName::from(full_name.clone());
696    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697        return Err(PlanError::ItemAlreadyExists {
698            name: full_name.to_string(),
699            item_type: item.item_type(),
700        });
701    }
702
703    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
704    // such, we always use a default of EpochMilliseconds.
705    let timeline = Timeline::EpochMilliseconds;
706
707    let plan = if is_table {
708        let data_source = DataSourceDesc::Webhook {
709            validate_using,
710            body_format,
711            headers,
712            cluster_id: Some(in_cluster.id()),
713        };
714        let data_source = TableDataSource::DataSource {
715            desc: data_source,
716            timeline,
717        };
718        Plan::CreateTable(CreateTablePlan {
719            name,
720            if_not_exists,
721            table: Table {
722                create_sql,
723                desc: VersionedRelationDesc::new(desc),
724                temporary: false,
725                compaction_window: None,
726                data_source,
727            },
728        })
729    } else {
730        let data_source = DataSourceDesc::Webhook {
731            validate_using,
732            body_format,
733            headers,
734            // Important: The cluster is set at the `Source` level.
735            cluster_id: None,
736        };
737        Plan::CreateSource(CreateSourcePlan {
738            name,
739            source: Source {
740                create_sql,
741                data_source,
742                desc,
743                compaction_window: None,
744            },
745            if_not_exists,
746            timeline,
747            in_cluster: Some(in_cluster.id()),
748        })
749    };
750
751    Ok(plan)
752}
753
754pub fn plan_create_source(
755    scx: &StatementContext,
756    mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758    let CreateSourceStatement {
759        name,
760        in_cluster: _,
761        col_names,
762        connection: source_connection,
763        envelope,
764        if_not_exists,
765        format,
766        key_constraint,
767        include_metadata,
768        with_options,
769        external_references: referenced_subsources,
770        progress_subsource,
771    } = &stmt;
772
773    mz_ore::soft_assert_or_log!(
774        referenced_subsources.is_none(),
775        "referenced subsources must be cleared in purification"
776    );
777
778    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779        && scx.catalog.system_vars().force_source_table_syntax();
780
781    // If the new source table syntax is forced all the options related to the primary
782    // source output should be un-set.
783    if force_source_table_syntax {
784        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785            Err(PlanError::UseTablesForSources(
786                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787            ))?;
788        }
789    }
790
791    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794        && include_metadata
795            .iter()
796            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797    {
798        // TODO(guswynn): should this be `bail_unsupported!`?
799        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800    }
801    if !matches!(
802        source_connection,
803        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804    ) && !include_metadata.is_empty()
805    {
806        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807    }
808
809    if !include_metadata.is_empty()
810        && !matches!(
811            envelope,
812            ast::SourceEnvelope::Upsert { .. }
813                | ast::SourceEnvelope::None
814                | ast::SourceEnvelope::Debezium
815        )
816    {
817        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818    }
819
820    let external_connection =
821        plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823    let CreateSourceOptionExtracted {
824        timestamp_interval,
825        retain_history,
826        seen: _,
827    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829    let metadata_columns_desc = match external_connection {
830        GenericSourceConnection::Kafka(KafkaSourceConnection {
831            ref metadata_columns,
832            ..
833        }) => kafka_metadata_columns_desc(metadata_columns),
834        _ => vec![],
835    };
836
837    // Generate the relation description for the primary export of the source.
838    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839        scx,
840        &envelope,
841        format,
842        Some(external_connection.default_key_desc()),
843        external_connection.default_value_desc(),
844        include_metadata,
845        metadata_columns_desc,
846        &external_connection,
847    )?;
848    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850    let names: Vec<_> = desc.iter_names().cloned().collect();
851    if let Some(dup) = names.iter().duplicates().next() {
852        sql_bail!("column {} specified more than once", dup.quoted());
853    }
854
855    // Apply user-specified key constraint
856    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857        // Don't remove this without addressing
858        // https://github.com/MaterializeInc/database-issues/issues/4371.
859        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861        let key_columns = columns
862            .into_iter()
863            .map(normalize::column_name)
864            .collect::<Vec<_>>();
865
866        let mut uniq = BTreeSet::new();
867        for col in key_columns.iter() {
868            if !uniq.insert(col) {
869                sql_bail!("Repeated column name in source key constraint: {}", col);
870            }
871        }
872
873        let key_indices = key_columns
874            .iter()
875            .map(|col| {
876                let name_idx = desc
877                    .get_by_name(col)
878                    .map(|(idx, _type)| idx)
879                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880                if desc.get_unambiguous_name(name_idx).is_none() {
881                    sql_bail!("Ambiguous column in source key constraint: {}", col);
882                }
883                Ok(name_idx)
884            })
885            .collect::<Result<Vec<_>, _>>()?;
886
887        if !desc.typ().keys.is_empty() {
888            return Err(key_constraint_err(&desc, &key_columns));
889        } else {
890            desc = desc.with_key(key_indices);
891        }
892    }
893
894    let timestamp_interval = match timestamp_interval {
895        Some(duration) => {
896            let min = scx.catalog.system_vars().min_timestamp_interval();
897            let max = scx.catalog.system_vars().max_timestamp_interval();
898            if duration < min || duration > max {
899                return Err(PlanError::InvalidTimestampInterval {
900                    min,
901                    max,
902                    requested: duration,
903                });
904            }
905            duration
906        }
907        None => scx.catalog.config().timestamp_interval,
908    };
909
910    let (desc, data_source) = match progress_subsource {
911        Some(name) => {
912            let DeferredItemName::Named(name) = name else {
913                sql_bail!("[internal error] progress subsource must be named during purification");
914            };
915            let ResolvedItemName::Item { id, .. } = name else {
916                sql_bail!("[internal error] invalid target id");
917            };
918
919            let details = match external_connection {
920                GenericSourceConnection::Kafka(ref c) => {
921                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
922                        metadata_columns: c.metadata_columns.clone(),
923                    })
924                }
925                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926                    LoadGenerator::Auction
927                    | LoadGenerator::Marketing
928                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929                    LoadGenerator::Counter { .. }
930                    | LoadGenerator::Clock
931                    | LoadGenerator::Datums
932                    | LoadGenerator::KeyValue(_) => {
933                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934                            output: LoadGeneratorOutput::Default,
935                        })
936                    }
937                },
938                GenericSourceConnection::Postgres(_)
939                | GenericSourceConnection::MySql(_)
940                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941            };
942
943            let data_source = DataSourceDesc::OldSyntaxIngestion {
944                desc: SourceDesc {
945                    connection: external_connection,
946                    timestamp_interval,
947                },
948                progress_subsource: *id,
949                data_config: SourceExportDataConfig {
950                    encoding,
951                    envelope: envelope.clone(),
952                },
953                details,
954            };
955            (desc, data_source)
956        }
957        None => {
958            let desc = external_connection.timestamp_desc();
959            let data_source = DataSourceDesc::Ingestion(SourceDesc {
960                connection: external_connection,
961                timestamp_interval,
962            });
963            (desc, data_source)
964        }
965    };
966
967    let if_not_exists = *if_not_exists;
968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970    // Check for an object in the catalog with this same name
971    let full_name = scx.catalog.resolve_full_name(&name);
972    let partial_name = PartialItemName::from(full_name.clone());
973    // For PostgreSQL compatibility, we need to prevent creating sources when
974    // there is an existing object *or* type of the same name.
975    if let (false, Ok(item)) = (
976        if_not_exists,
977        scx.catalog.resolve_item_or_type(&partial_name),
978    ) {
979        return Err(PlanError::ItemAlreadyExists {
980            name: full_name.to_string(),
981            item_type: item.item_type(),
982        });
983    }
984
985    // We will rewrite the cluster if one is not provided, so we must use the
986    // `in_cluster` value we plan to normalize when we canonicalize the create
987    // statement.
988    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992    // Determine a default timeline for the source.
993    let timeline = match envelope {
994        SourceEnvelope::CdcV2 => {
995            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996        }
997        _ => Timeline::EpochMilliseconds,
998    };
999
1000    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002    let source = Source {
1003        create_sql,
1004        data_source,
1005        desc,
1006        compaction_window,
1007    };
1008
1009    Ok(Plan::CreateSource(CreateSourcePlan {
1010        name,
1011        source,
1012        if_not_exists,
1013        timeline,
1014        in_cluster: Some(in_cluster.id()),
1015    }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019    scx: &StatementContext<'_>,
1020    source_connection: &CreateSourceConnection<Aug>,
1021    include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023    Ok(match source_connection {
1024        CreateSourceConnection::Kafka {
1025            connection,
1026            options,
1027        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028            scx,
1029            connection,
1030            options,
1031            include_metadata,
1032        )?),
1033        CreateSourceConnection::Postgres {
1034            connection,
1035            options,
1036        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037            scx, connection, options,
1038        )?),
1039        CreateSourceConnection::SqlServer {
1040            connection,
1041            options,
1042        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043            scx, connection, options,
1044        )?),
1045        CreateSourceConnection::MySql {
1046            connection,
1047            options,
1048        } => {
1049            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050        }
1051        CreateSourceConnection::LoadGenerator { generator, options } => {
1052            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053                scx,
1054                generator,
1055                options,
1056                include_metadata,
1057            )?)
1058        }
1059    })
1060}
1061
1062fn plan_load_generator_source_connection(
1063    scx: &StatementContext<'_>,
1064    generator: &ast::LoadGenerator,
1065    options: &Vec<LoadGeneratorOption<Aug>>,
1066    include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068    let load_generator =
1069        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070    let LoadGeneratorOptionExtracted {
1071        tick_interval,
1072        as_of,
1073        up_to,
1074        ..
1075    } = options.clone().try_into()?;
1076    let tick_micros = match tick_interval {
1077        Some(interval) => Some(interval.as_micros().try_into()?),
1078        None => None,
1079    };
1080    if up_to < as_of {
1081        sql_bail!("UP TO cannot be less than AS OF");
1082    }
1083    Ok(LoadGeneratorSourceConnection {
1084        load_generator,
1085        tick_micros,
1086        as_of,
1087        up_to,
1088    })
1089}
1090
1091fn plan_mysql_source_connection(
1092    scx: &StatementContext<'_>,
1093    connection: &ResolvedItemName,
1094    options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096    let connection_item = scx.get_item_by_resolved_name(connection)?;
1097    match connection_item.connection()? {
1098        Connection::MySql(connection) => connection,
1099        _ => sql_bail!(
1100            "{} is not a MySQL connection",
1101            scx.catalog.resolve_full_name(connection_item.name())
1102        ),
1103    };
1104    let MySqlConfigOptionExtracted {
1105        details,
1106        // text/exclude columns are already part of the source-exports and are only included
1107        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1108        // be removed once we drop support for implicitly created subsources.
1109        text_columns: _,
1110        exclude_columns: _,
1111        seen: _,
1112    } = options.clone().try_into()?;
1113    let details = details
1114        .as_ref()
1115        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119    Ok(MySqlSourceConnection {
1120        connection: connection_item.id(),
1121        connection_id: connection_item.id(),
1122        details,
1123    })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127    scx: &StatementContext<'_>,
1128    connection: &ResolvedItemName,
1129    options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131    let connection_item = scx.get_item_by_resolved_name(connection)?;
1132    match connection_item.connection()? {
1133        Connection::SqlServer(connection) => connection,
1134        _ => sql_bail!(
1135            "{} is not a SQL Server connection",
1136            scx.catalog.resolve_full_name(connection_item.name())
1137        ),
1138    };
1139    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140    let details = details
1141        .as_ref()
1142        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143    let extras = hex::decode(details)
1144        .map_err(|e| sql_err!("{e}"))
1145        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147    Ok(SqlServerSourceConnection {
1148        connection_id: connection_item.id(),
1149        connection: connection_item.id(),
1150        extras,
1151    })
1152}
1153
1154fn plan_postgres_source_connection(
1155    scx: &StatementContext<'_>,
1156    connection: &ResolvedItemName,
1157    options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159    let connection_item = scx.get_item_by_resolved_name(connection)?;
1160    let PgConfigOptionExtracted {
1161        details,
1162        publication,
1163        // text columns are already part of the source-exports and are only included
1164        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1165        // be removed once we drop support for implicitly created subsources.
1166        text_columns: _,
1167        // exclude columns are already part of the source-exports and are only included
1168        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1169        // be removed once we drop support for implicitly created subsources.
1170        exclude_columns: _,
1171        seen: _,
1172    } = options.clone().try_into()?;
1173    let details = details
1174        .as_ref()
1175        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177    let details =
1178        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179    let publication_details =
1180        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181    Ok(PostgresSourceConnection {
1182        connection: connection_item.id(),
1183        connection_id: connection_item.id(),
1184        publication: publication.expect("validated exists during purification"),
1185        publication_details,
1186    })
1187}
1188
1189fn plan_kafka_source_connection(
1190    scx: &StatementContext<'_>,
1191    connection_name: &ResolvedItemName,
1192    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193    include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197        sql_bail!(
1198            "{} is not a kafka connection",
1199            scx.catalog.resolve_full_name(connection_item.name())
1200        )
1201    }
1202    let KafkaSourceConfigOptionExtracted {
1203        group_id_prefix,
1204        topic,
1205        topic_metadata_refresh_interval,
1206        start_timestamp: _, // purified into `start_offset`
1207        start_offset,
1208        seen: _,
1209    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210    let topic = topic.expect("validated exists during purification");
1211    let mut start_offsets = BTreeMap::new();
1212    if let Some(offsets) = start_offset {
1213        for (part, offset) in offsets.iter().enumerate() {
1214            if *offset < 0 {
1215                sql_bail!("START OFFSET must be a nonnegative integer");
1216            }
1217            start_offsets.insert(i32::try_from(part)?, *offset);
1218        }
1219    }
1220    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221        // This is a librdkafka-enforced restriction that, if violated,
1222        // would result in a runtime error for the source.
1223        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224    }
1225    let metadata_columns = include_metadata
1226        .into_iter()
1227        .flat_map(|item| match item {
1228            SourceIncludeMetadata::Timestamp { alias } => {
1229                let name = match alias {
1230                    Some(name) => name.to_string(),
1231                    None => "timestamp".to_owned(),
1232                };
1233                Some((name, KafkaMetadataKind::Timestamp))
1234            }
1235            SourceIncludeMetadata::Partition { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "partition".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Partition))
1241            }
1242            SourceIncludeMetadata::Offset { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "offset".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Offset))
1248            }
1249            SourceIncludeMetadata::Headers { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "headers".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Headers))
1255            }
1256            SourceIncludeMetadata::Header {
1257                alias,
1258                key,
1259                use_bytes,
1260            } => Some((
1261                alias.to_string(),
1262                KafkaMetadataKind::Header {
1263                    key: key.clone(),
1264                    use_bytes: *use_bytes,
1265                },
1266            )),
1267            SourceIncludeMetadata::Key { .. } => {
1268                // handled below
1269                None
1270            }
1271        })
1272        .collect();
1273    Ok(KafkaSourceConnection {
1274        connection: connection_item.id(),
1275        connection_id: connection_item.id(),
1276        topic,
1277        start_offsets,
1278        group_id_prefix,
1279        topic_metadata_refresh_interval,
1280        metadata_columns,
1281    })
1282}
1283
1284fn apply_source_envelope_encoding(
1285    scx: &StatementContext,
1286    envelope: &ast::SourceEnvelope,
1287    format: &Option<FormatSpecifier<Aug>>,
1288    key_desc: Option<RelationDesc>,
1289    value_desc: RelationDesc,
1290    include_metadata: &[SourceIncludeMetadata],
1291    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292    source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294    (
1295        RelationDesc,
1296        SourceEnvelope,
1297        Option<SourceDataEncoding<ReferencedConnection>>,
1298    ),
1299    PlanError,
1300> {
1301    let encoding = match format {
1302        Some(format) => Some(get_encoding(scx, format, envelope)?),
1303        None => None,
1304    };
1305
1306    let (key_desc, value_desc) = match &encoding {
1307        Some(encoding) => {
1308            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1309            // single column of type bytes.
1310            match value_desc.typ().columns() {
1311                [typ] => match typ.scalar_type {
1312                    SqlScalarType::Bytes => {}
1313                    _ => sql_bail!(
1314                        "The schema produced by the source is incompatible with format decoding"
1315                    ),
1316                },
1317                _ => sql_bail!(
1318                    "The schema produced by the source is incompatible with format decoding"
1319                ),
1320            }
1321
1322            let (key_desc, value_desc) = encoding.desc()?;
1323
1324            // TODO(petrosagg): This piece of code seems to be making a statement about the
1325            // nullability of the NONE envelope when the source is Kafka. As written, the code
1326            // misses opportunities to mark columns as not nullable and is over conservative. For
1327            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1328            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1329            // should be replaced with precise type-level reasoning.
1330            let key_desc = key_desc.map(|desc| {
1331                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333                if is_kafka && is_envelope_none {
1334                    RelationDesc::from_names_and_types(
1335                        desc.into_iter()
1336                            .map(|(name, typ)| (name, typ.nullable(true))),
1337                    )
1338                } else {
1339                    desc
1340                }
1341            });
1342            (key_desc, value_desc)
1343        }
1344        None => (key_desc, value_desc),
1345    };
1346
1347    // KEY VALUE load generators are the only UPSERT source that
1348    // has no encoding but defaults to `INCLUDE KEY`.
1349    //
1350    // As discussed
1351    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1352    // removing this special case amounts to deciding how to handle null keys
1353    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1354    // this special case in.
1355    //
1356    // Note that this is safe because this generator is
1357    // 1. The only source with no encoding that can have its key included.
1358    // 2. Never produces null keys (or values, for that matter).
1359    let key_envelope_no_encoding = matches!(
1360        source_connection,
1361        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362            load_generator: LoadGenerator::KeyValue(_),
1363            ..
1364        })
1365    );
1366    let mut key_envelope = get_key_envelope(
1367        include_metadata,
1368        encoding.as_ref(),
1369        key_envelope_no_encoding,
1370    )?;
1371
1372    match (&envelope, &key_envelope) {
1373        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376        ),
1377        _ => {}
1378    };
1379
1380    // Not all source envelopes are compatible with all source connections.
1381    // Whoever constructs the source ingestion pipeline is responsible for
1382    // choosing compatible envelopes and connections.
1383    //
1384    // TODO(guswynn): ambiguously assert which connections and envelopes are
1385    // compatible in typechecking
1386    //
1387    // TODO: remove bails as more support for upsert is added.
1388    let envelope = match &envelope {
1389        // TODO: fixup key envelope
1390        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391        ast::SourceEnvelope::Debezium => {
1392            //TODO check that key envelope is not set
1393            let after_idx = match typecheck_debezium(&value_desc) {
1394                Ok((_before_idx, after_idx)) => Ok(after_idx),
1395                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396                    Some(DataEncoding::Avro(_)) => Err(type_err),
1397                    _ => Err(sql_err!(
1398                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399                    )),
1400                },
1401            }?;
1402
1403            UnplannedSourceEnvelope::Upsert {
1404                style: UpsertStyle::Debezium { after_idx },
1405            }
1406        }
1407        ast::SourceEnvelope::Upsert {
1408            value_decode_err_policy,
1409        } => {
1410            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411                None => {
1412                    if !key_envelope_no_encoding {
1413                        bail_unsupported!(format!(
1414                            "UPSERT requires a key/value format: {:?}",
1415                            format
1416                        ))
1417                    }
1418                    None
1419                }
1420                Some(key_encoding) => Some(key_encoding),
1421            };
1422            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1423            // specified.
1424            if key_envelope == KeyEnvelope::None {
1425                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426            }
1427            // If the value decode error policy is not set we use the default upsert style.
1428            let style = match value_decode_err_policy.as_slice() {
1429                [] => UpsertStyle::Default(key_envelope),
1430                [SourceErrorPolicy::Inline { alias }] => {
1431                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432                    UpsertStyle::ValueErrInline {
1433                        key_envelope,
1434                        error_column: alias
1435                            .as_ref()
1436                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437                    }
1438                }
1439                _ => {
1440                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441                }
1442            };
1443
1444            UnplannedSourceEnvelope::Upsert { style }
1445        }
1446        ast::SourceEnvelope::CdcV2 => {
1447            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448            //TODO check that key envelope is not set
1449            match format {
1450                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452            }
1453            UnplannedSourceEnvelope::CdcV2
1454        }
1455    };
1456
1457    let metadata_desc = included_column_desc(metadata_columns_desc);
1458    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460    Ok((desc, envelope, encoding))
1461}
1462
1463/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1464/// of columns and constraints.
1465fn plan_source_export_desc(
1466    scx: &StatementContext,
1467    name: &UnresolvedItemName,
1468    columns: &Vec<ColumnDef<Aug>>,
1469    constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471    let names: Vec<_> = columns
1472        .iter()
1473        .map(|c| normalize::column_name(c.name.clone()))
1474        .collect();
1475
1476    if let Some(dup) = names.iter().duplicates().next() {
1477        sql_bail!("column {} specified more than once", dup.quoted());
1478    }
1479
1480    // Build initial relation type that handles declared data types
1481    // and NOT NULL constraints.
1482    let mut column_types = Vec::with_capacity(columns.len());
1483    let mut keys = Vec::new();
1484
1485    for (i, c) in columns.into_iter().enumerate() {
1486        let aug_data_type = &c.data_type;
1487        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488        let mut nullable = true;
1489        for option in &c.options {
1490            match &option.option {
1491                ColumnOption::NotNull => nullable = false,
1492                ColumnOption::Default(_) => {
1493                    bail_unsupported!("Source export with default value")
1494                }
1495                ColumnOption::Unique { is_primary } => {
1496                    keys.push(vec![i]);
1497                    if *is_primary {
1498                        nullable = false;
1499                    }
1500                }
1501                other => {
1502                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1503                }
1504            }
1505        }
1506        column_types.push(ty.nullable(nullable));
1507    }
1508
1509    let mut seen_primary = false;
1510    'c: for constraint in constraints {
1511        match constraint {
1512            TableConstraint::Unique {
1513                name: _,
1514                columns,
1515                is_primary,
1516                nulls_not_distinct,
1517            } => {
1518                if seen_primary && *is_primary {
1519                    sql_bail!(
1520                        "multiple primary keys for source export {} are not allowed",
1521                        name.to_ast_string_stable()
1522                    );
1523                }
1524                seen_primary = *is_primary || seen_primary;
1525
1526                let mut key = vec![];
1527                for column in columns {
1528                    let column = normalize::column_name(column.clone());
1529                    match names.iter().position(|name| *name == column) {
1530                        None => sql_bail!("unknown column in constraint: {}", column),
1531                        Some(i) => {
1532                            let nullable = &mut column_types[i].nullable;
1533                            if *is_primary {
1534                                if *nulls_not_distinct {
1535                                    sql_bail!(
1536                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537                                    );
1538                                }
1539                                *nullable = false;
1540                            } else if !(*nulls_not_distinct || !*nullable) {
1541                                // Non-primary key unique constraints are only keys if all of their
1542                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1543                                break 'c;
1544                            }
1545
1546                            key.push(i);
1547                        }
1548                    }
1549                }
1550
1551                if *is_primary {
1552                    keys.insert(0, key);
1553                } else {
1554                    keys.push(key);
1555                }
1556            }
1557            TableConstraint::ForeignKey { .. } => {
1558                bail_unsupported!("Source export with a foreign key")
1559            }
1560            TableConstraint::Check { .. } => {
1561                bail_unsupported!("Source export with a check constraint")
1562            }
1563        }
1564    }
1565
1566    let typ = SqlRelationType::new(column_types).with_keys(keys);
1567    let desc = RelationDesc::new(typ, names);
1568    Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572    CreateSubsourceOption,
1573    (Progress, bool, Default(false)),
1574    (ExternalReference, UnresolvedItemName),
1575    (RetainHistory, OptionalDuration),
1576    (TextColumns, Vec::<Ident>, Default(vec![])),
1577    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578    (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582    scx: &StatementContext,
1583    stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585    let CreateSubsourceStatement {
1586        name,
1587        columns,
1588        of_source,
1589        constraints,
1590        if_not_exists,
1591        with_options,
1592    } = &stmt;
1593
1594    let CreateSubsourceOptionExtracted {
1595        progress,
1596        retain_history,
1597        external_reference,
1598        text_columns,
1599        exclude_columns,
1600        details,
1601        seen: _,
1602    } = with_options.clone().try_into()?;
1603
1604    // This invariant is enforced during purification; we are responsible for
1605    // creating the AST for subsources as a response to CREATE SOURCE
1606    // statements, so this would fire in integration testing if we failed to
1607    // uphold it.
1608    assert!(
1609        progress ^ (external_reference.is_some() && of_source.is_some()),
1610        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611    );
1612
1613    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615    let data_source = if let Some(source_reference) = of_source {
1616        // If the new source table syntax is forced we should not be creating any non-progress
1617        // subsources.
1618        if scx.catalog.system_vars().enable_create_table_from_source()
1619            && scx.catalog.system_vars().force_source_table_syntax()
1620        {
1621            Err(PlanError::UseTablesForSources(
1622                "CREATE SUBSOURCE".to_string(),
1623            ))?;
1624        }
1625
1626        // This is a subsource with the "natural" dependency order, i.e. it is
1627        // not a legacy subsource with the inverted structure.
1628        let ingestion_id = *source_reference.item_id();
1629        let external_reference = external_reference.unwrap();
1630
1631        // Decode the details option stored on the subsource statement, which contains information
1632        // created during the purification process.
1633        let details = details
1634            .as_ref()
1635            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637        let details =
1638            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639        let details =
1640            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641        let details = match details {
1642            SourceExportStatementDetails::Postgres { table } => {
1643                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644                    column_casts: crate::pure::postgres::generate_column_casts(
1645                        scx,
1646                        &table,
1647                        &text_columns,
1648                    )?,
1649                    table,
1650                })
1651            }
1652            SourceExportStatementDetails::MySql {
1653                table,
1654                initial_gtid_set,
1655            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656                table,
1657                initial_gtid_set,
1658                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659                exclude_columns: exclude_columns
1660                    .into_iter()
1661                    .map(|c| c.into_string())
1662                    .collect(),
1663            }),
1664            SourceExportStatementDetails::SqlServer {
1665                table,
1666                capture_instance,
1667                initial_lsn,
1668            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669                capture_instance,
1670                table,
1671                initial_lsn,
1672                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673                exclude_columns: exclude_columns
1674                    .into_iter()
1675                    .map(|c| c.into_string())
1676                    .collect(),
1677            }),
1678            SourceExportStatementDetails::LoadGenerator { output } => {
1679                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680            }
1681            SourceExportStatementDetails::Kafka {} => {
1682                bail_unsupported!("subsources cannot reference Kafka sources")
1683            }
1684        };
1685        DataSourceDesc::IngestionExport {
1686            ingestion_id,
1687            external_reference,
1688            details,
1689            // Subsources don't currently support non-default envelopes / encoding
1690            data_config: SourceExportDataConfig {
1691                envelope: SourceEnvelope::None(NoneEnvelope {
1692                    key_envelope: KeyEnvelope::None,
1693                    key_arity: 0,
1694                }),
1695                encoding: None,
1696            },
1697        }
1698    } else if progress {
1699        DataSourceDesc::Progress
1700    } else {
1701        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702    };
1703
1704    let if_not_exists = *if_not_exists;
1705    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710    let source = Source {
1711        create_sql,
1712        data_source,
1713        desc,
1714        compaction_window,
1715    };
1716
1717    Ok(Plan::CreateSource(CreateSourcePlan {
1718        name,
1719        source,
1720        if_not_exists,
1721        timeline: Timeline::EpochMilliseconds,
1722        in_cluster: None,
1723    }))
1724}
1725
1726generate_extracted_config!(
1727    TableFromSourceOption,
1728    (TextColumns, Vec::<Ident>, Default(vec![])),
1729    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730    (PartitionBy, Vec<Ident>),
1731    (RetainHistory, OptionalDuration),
1732    (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736    scx: &StatementContext,
1737    stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739    if !scx.catalog.system_vars().enable_create_table_from_source() {
1740        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741    }
1742
1743    let CreateTableFromSourceStatement {
1744        name,
1745        columns,
1746        constraints,
1747        if_not_exists,
1748        source,
1749        external_reference,
1750        envelope,
1751        format,
1752        include_metadata,
1753        with_options,
1754    } = &stmt;
1755
1756    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758    let TableFromSourceOptionExtracted {
1759        text_columns,
1760        exclude_columns,
1761        retain_history,
1762        partition_by,
1763        details,
1764        seen: _,
1765    } = with_options.clone().try_into()?;
1766
1767    let source_item = scx.get_item_by_resolved_name(source)?;
1768    let ingestion_id = source_item.id();
1769
1770    // Decode the details option stored on the statement, which contains information
1771    // created during the purification process.
1772    let details = details
1773        .as_ref()
1774        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776    let details =
1777        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778    let details =
1779        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782        && include_metadata
1783            .iter()
1784            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785    {
1786        // TODO(guswynn): should this be `bail_unsupported!`?
1787        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788    }
1789    if !matches!(
1790        details,
1791        SourceExportStatementDetails::Kafka { .. }
1792            | SourceExportStatementDetails::LoadGenerator { .. }
1793    ) && !include_metadata.is_empty()
1794    {
1795        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796    }
1797
1798    let details = match details {
1799        SourceExportStatementDetails::Postgres { table } => {
1800            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801                column_casts: crate::pure::postgres::generate_column_casts(
1802                    scx,
1803                    &table,
1804                    &text_columns,
1805                )?,
1806                table,
1807            })
1808        }
1809        SourceExportStatementDetails::MySql {
1810            table,
1811            initial_gtid_set,
1812        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813            table,
1814            initial_gtid_set,
1815            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816            exclude_columns: exclude_columns
1817                .into_iter()
1818                .map(|c| c.into_string())
1819                .collect(),
1820        }),
1821        SourceExportStatementDetails::SqlServer {
1822            table,
1823            capture_instance,
1824            initial_lsn,
1825        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826            table,
1827            capture_instance,
1828            initial_lsn,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834        }),
1835        SourceExportStatementDetails::LoadGenerator { output } => {
1836            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837        }
1838        SourceExportStatementDetails::Kafka {} => {
1839            if !include_metadata.is_empty()
1840                && !matches!(
1841                    envelope,
1842                    ast::SourceEnvelope::Upsert { .. }
1843                        | ast::SourceEnvelope::None
1844                        | ast::SourceEnvelope::Debezium
1845                )
1846            {
1847                // TODO(guswynn): should this be `bail_unsupported!`?
1848                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849            }
1850
1851            let metadata_columns = include_metadata
1852                .into_iter()
1853                .flat_map(|item| match item {
1854                    SourceIncludeMetadata::Timestamp { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "timestamp".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Timestamp))
1860                    }
1861                    SourceIncludeMetadata::Partition { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "partition".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Partition))
1867                    }
1868                    SourceIncludeMetadata::Offset { alias } => {
1869                        let name = match alias {
1870                            Some(name) => name.to_string(),
1871                            None => "offset".to_owned(),
1872                        };
1873                        Some((name, KafkaMetadataKind::Offset))
1874                    }
1875                    SourceIncludeMetadata::Headers { alias } => {
1876                        let name = match alias {
1877                            Some(name) => name.to_string(),
1878                            None => "headers".to_owned(),
1879                        };
1880                        Some((name, KafkaMetadataKind::Headers))
1881                    }
1882                    SourceIncludeMetadata::Header {
1883                        alias,
1884                        key,
1885                        use_bytes,
1886                    } => Some((
1887                        alias.to_string(),
1888                        KafkaMetadataKind::Header {
1889                            key: key.clone(),
1890                            use_bytes: *use_bytes,
1891                        },
1892                    )),
1893                    SourceIncludeMetadata::Key { .. } => {
1894                        // handled below
1895                        None
1896                    }
1897                })
1898                .collect();
1899
1900            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901        }
1902    };
1903
1904    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1907    // during purification and define the `columns` and `constraints` fields for the statement,
1908    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1909    // we use the source connection's default schema.
1910    let (key_desc, value_desc) =
1911        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912            let columns = match columns {
1913                TableFromSourceColumns::Defined(columns) => columns,
1914                _ => unreachable!(),
1915            };
1916            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917            (None, desc)
1918        } else {
1919            let key_desc = source_connection.default_key_desc();
1920            let value_desc = source_connection.default_value_desc();
1921            (Some(key_desc), value_desc)
1922        };
1923
1924    let metadata_columns_desc = match &details {
1925        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926            metadata_columns, ..
1927        }) => kafka_metadata_columns_desc(metadata_columns),
1928        _ => vec![],
1929    };
1930
1931    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932        scx,
1933        &envelope,
1934        format,
1935        key_desc,
1936        value_desc,
1937        include_metadata,
1938        metadata_columns_desc,
1939        source_connection,
1940    )?;
1941    if let TableFromSourceColumns::Named(col_names) = columns {
1942        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943    }
1944
1945    let names: Vec<_> = desc.iter_names().cloned().collect();
1946    if let Some(dup) = names.iter().duplicates().next() {
1947        sql_bail!("column {} specified more than once", dup.quoted());
1948    }
1949
1950    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952    // Allow users to specify a timeline. If they do not, determine a default
1953    // timeline for the source.
1954    let timeline = match envelope {
1955        SourceEnvelope::CdcV2 => {
1956            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957        }
1958        _ => Timeline::EpochMilliseconds,
1959    };
1960
1961    if let Some(partition_by) = partition_by {
1962        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963        check_partition_by(&desc, partition_by)?;
1964    }
1965
1966    let data_source = DataSourceDesc::IngestionExport {
1967        ingestion_id,
1968        external_reference: external_reference
1969            .as_ref()
1970            .expect("populated in purification")
1971            .clone(),
1972        details,
1973        data_config: SourceExportDataConfig { envelope, encoding },
1974    };
1975
1976    let if_not_exists = *if_not_exists;
1977
1978    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981    let table = Table {
1982        create_sql,
1983        desc: VersionedRelationDesc::new(desc),
1984        temporary: false,
1985        compaction_window,
1986        data_source: TableDataSource::DataSource {
1987            desc: data_source,
1988            timeline,
1989        },
1990    };
1991
1992    Ok(Plan::CreateTable(CreateTablePlan {
1993        name,
1994        table,
1995        if_not_exists,
1996    }))
1997}
1998
1999generate_extracted_config!(
2000    LoadGeneratorOption,
2001    (TickInterval, Duration),
2002    (AsOf, u64, Default(0_u64)),
2003    (UpTo, u64, Default(u64::MAX)),
2004    (ScaleFactor, f64),
2005    (MaxCardinality, u64),
2006    (Keys, u64),
2007    (SnapshotRounds, u64),
2008    (TransactionalSnapshot, bool),
2009    (ValueSize, u64),
2010    (Seed, u64),
2011    (Partitions, u64),
2012    (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016    pub(super) fn ensure_only_valid_options(
2017        &self,
2018        loadgen: &ast::LoadGenerator,
2019    ) -> Result<(), PlanError> {
2020        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022        let mut options = self.seen.clone();
2023
2024        let permitted_options: &[_] = match loadgen {
2025            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031            ast::LoadGenerator::KeyValue => &[
2032                TickInterval,
2033                Keys,
2034                SnapshotRounds,
2035                TransactionalSnapshot,
2036                ValueSize,
2037                Seed,
2038                Partitions,
2039                BatchSize,
2040            ],
2041        };
2042
2043        for o in permitted_options {
2044            options.remove(o);
2045        }
2046
2047        if !options.is_empty() {
2048            sql_bail!(
2049                "{} load generators do not support {} values",
2050                loadgen,
2051                options.iter().join(", ")
2052            )
2053        }
2054
2055        Ok(())
2056    }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060    scx: &StatementContext,
2061    loadgen: &ast::LoadGenerator,
2062    options: &[LoadGeneratorOption<Aug>],
2063    include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066    extracted.ensure_only_valid_options(loadgen)?;
2067
2068    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070    }
2071
2072    let load_generator = match loadgen {
2073        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074        ast::LoadGenerator::Clock => {
2075            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076            LoadGenerator::Clock
2077        }
2078        ast::LoadGenerator::Counter => {
2079            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080            let LoadGeneratorOptionExtracted {
2081                max_cardinality, ..
2082            } = extracted;
2083            LoadGenerator::Counter { max_cardinality }
2084        }
2085        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086        ast::LoadGenerator::Datums => {
2087            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088            LoadGenerator::Datums
2089        }
2090        ast::LoadGenerator::Tpch => {
2091            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093            // Default to 0.01 scale factor (=10MB).
2094            let sf: f64 = scale_factor.unwrap_or(0.01);
2095            if !sf.is_finite() || sf < 0.0 {
2096                sql_bail!("unsupported scale factor {sf}");
2097            }
2098
2099            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100                let total = (sf * multiplier).floor();
2101                let mut i = i64::try_cast_from(total)
2102                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103                if i < 1 {
2104                    i = 1;
2105                }
2106                Ok(i)
2107            };
2108
2109            // The multiplications here are safely unchecked because they will
2110            // overflow to infinity, which will be caught by f64_to_i64.
2111            let count_supplier = f_to_i(10_000f64)?;
2112            let count_part = f_to_i(200_000f64)?;
2113            let count_customer = f_to_i(150_000f64)?;
2114            let count_orders = f_to_i(150_000f64 * 10f64)?;
2115            let count_clerk = f_to_i(1_000f64)?;
2116
2117            LoadGenerator::Tpch {
2118                count_supplier,
2119                count_part,
2120                count_customer,
2121                count_orders,
2122                count_clerk,
2123            }
2124        }
2125        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127            let LoadGeneratorOptionExtracted {
2128                keys,
2129                snapshot_rounds,
2130                transactional_snapshot,
2131                value_size,
2132                tick_interval,
2133                seed,
2134                partitions,
2135                batch_size,
2136                ..
2137            } = extracted;
2138
2139            let mut include_offset = None;
2140            for im in include_metadata {
2141                match im {
2142                    SourceIncludeMetadata::Offset { alias } => {
2143                        include_offset = match alias {
2144                            Some(alias) => Some(alias.to_string()),
2145                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146                        }
2147                    }
2148                    SourceIncludeMetadata::Key { .. } => continue,
2149
2150                    _ => {
2151                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152                    }
2153                };
2154            }
2155
2156            let lgkv = KeyValueLoadGenerator {
2157                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158                snapshot_rounds: snapshot_rounds
2159                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160                // Defaults to true.
2161                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162                value_size: value_size
2163                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164                partitions: partitions
2165                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166                tick_interval,
2167                batch_size: batch_size
2168                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170                include_offset,
2171            };
2172
2173            if lgkv.keys == 0
2174                || lgkv.partitions == 0
2175                || lgkv.value_size == 0
2176                || lgkv.batch_size == 0
2177            {
2178                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179            }
2180
2181            if lgkv.keys % lgkv.partitions != 0 {
2182                sql_bail!("KEYS must be a multiple of PARTITIONS")
2183            }
2184
2185            if lgkv.batch_size > lgkv.keys {
2186                sql_bail!("KEYS must be larger than BATCH SIZE")
2187            }
2188
2189            // This constraints simplifies the source implementation.
2190            // We can lift it later.
2191            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193            }
2194
2195            if lgkv.snapshot_rounds == 0 {
2196                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197            }
2198
2199            LoadGenerator::KeyValue(lgkv)
2200        }
2201    };
2202
2203    Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207    let before = value_desc.get_by_name(&"before".into());
2208    let (after_idx, after_ty) = value_desc
2209        .get_by_name(&"after".into())
2210        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211    let before_idx = if let Some((before_idx, before_ty)) = before {
2212        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213            sql_bail!("'before' column must be of type record");
2214        }
2215        if before_ty != after_ty {
2216            sql_bail!("'before' type differs from 'after' column");
2217        }
2218        Some(before_idx)
2219    } else {
2220        None
2221    };
2222    Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226    scx: &StatementContext,
2227    format: &FormatSpecifier<Aug>,
2228    envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230    let encoding = match format {
2231        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232        FormatSpecifier::KeyValue { key, value } => {
2233            let key = {
2234                let encoding = get_encoding_inner(scx, key)?;
2235                Some(encoding.key.unwrap_or(encoding.value))
2236            };
2237            let value = get_encoding_inner(scx, value)?.value;
2238            SourceDataEncoding { key, value }
2239        }
2240    };
2241
2242    let requires_keyvalue = matches!(
2243        envelope,
2244        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245    );
2246    let is_keyvalue = encoding.key.is_some();
2247    if requires_keyvalue && !is_keyvalue {
2248        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249    };
2250
2251    Ok(encoding)
2252}
2253
2254/// Determine the cluster ID to use for this item.
2255///
2256/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2257/// Because of this, do not normalize/canonicalize the create SQL statement
2258/// until after calling this function.
2259fn source_sink_cluster_config<'a, 'ctx>(
2260    scx: &'a StatementContext<'ctx>,
2261    in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263    let cluster = match in_cluster {
2264        None => {
2265            let cluster = scx.catalog.resolve_cluster(None)?;
2266            *in_cluster = Some(ResolvedClusterName {
2267                id: cluster.id(),
2268                print_name: None,
2269            });
2270            cluster
2271        }
2272        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273    };
2274
2275    Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282    pub key_schema: Option<String>,
2283    pub value_schema: String,
2284    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285    pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289    scx: &StatementContext,
2290    format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292    let value = match format {
2293        Format::Bytes => DataEncoding::Bytes,
2294        Format::Avro(schema) => {
2295            let Schema {
2296                key_schema,
2297                value_schema,
2298                csr_connection,
2299                confluent_wire_format,
2300            } = match schema {
2301                // TODO(jldlaughlin): we need a way to pass in primary key information
2302                // when building a source from a string or file.
2303                AvroSchema::InlineSchema {
2304                    schema: ast::Schema { schema },
2305                    with_options,
2306                } => {
2307                    let AvroSchemaOptionExtracted {
2308                        confluent_wire_format,
2309                        ..
2310                    } = with_options.clone().try_into()?;
2311
2312                    Schema {
2313                        key_schema: None,
2314                        value_schema: schema.clone(),
2315                        csr_connection: None,
2316                        confluent_wire_format,
2317                    }
2318                }
2319                AvroSchema::Csr {
2320                    csr_connection:
2321                        CsrConnectionAvro {
2322                            connection,
2323                            seed,
2324                            key_strategy: _,
2325                            value_strategy: _,
2326                        },
2327                } => {
2328                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329                    let csr_connection = match item.connection()? {
2330                        Connection::Csr(_) => item.id(),
2331                        _ => {
2332                            sql_bail!(
2333                                "{} is not a schema registry connection",
2334                                scx.catalog
2335                                    .resolve_full_name(item.name())
2336                                    .to_string()
2337                                    .quoted()
2338                            )
2339                        }
2340                    };
2341
2342                    if let Some(seed) = seed {
2343                        Schema {
2344                            key_schema: seed.key_schema.clone(),
2345                            value_schema: seed.value_schema.clone(),
2346                            csr_connection: Some(csr_connection),
2347                            confluent_wire_format: true,
2348                        }
2349                    } else {
2350                        unreachable!("CSR seed resolution should already have been called: Avro")
2351                    }
2352                }
2353            };
2354
2355            if let Some(key_schema) = key_schema {
2356                return Ok(SourceDataEncoding {
2357                    key: Some(DataEncoding::Avro(AvroEncoding {
2358                        schema: key_schema,
2359                        csr_connection: csr_connection.clone(),
2360                        confluent_wire_format,
2361                    })),
2362                    value: DataEncoding::Avro(AvroEncoding {
2363                        schema: value_schema,
2364                        csr_connection,
2365                        confluent_wire_format,
2366                    }),
2367                });
2368            } else {
2369                DataEncoding::Avro(AvroEncoding {
2370                    schema: value_schema,
2371                    csr_connection,
2372                    confluent_wire_format,
2373                })
2374            }
2375        }
2376        Format::Protobuf(schema) => match schema {
2377            ProtobufSchema::Csr {
2378                csr_connection:
2379                    CsrConnectionProtobuf {
2380                        connection:
2381                            CsrConnection {
2382                                connection,
2383                                options,
2384                            },
2385                        seed,
2386                    },
2387            } => {
2388                if let Some(CsrSeedProtobuf { key, value }) = seed {
2389                    let item = scx.get_item_by_resolved_name(connection)?;
2390                    let _ = match item.connection()? {
2391                        Connection::Csr(connection) => connection,
2392                        _ => {
2393                            sql_bail!(
2394                                "{} is not a schema registry connection",
2395                                scx.catalog
2396                                    .resolve_full_name(item.name())
2397                                    .to_string()
2398                                    .quoted()
2399                            )
2400                        }
2401                    };
2402
2403                    if !options.is_empty() {
2404                        sql_bail!("Protobuf CSR connections do not support any options");
2405                    }
2406
2407                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2408                        descriptors: strconv::parse_bytes(&value.schema)?,
2409                        message_name: value.message_name.clone(),
2410                        confluent_wire_format: true,
2411                    });
2412                    if let Some(key) = key {
2413                        return Ok(SourceDataEncoding {
2414                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415                                descriptors: strconv::parse_bytes(&key.schema)?,
2416                                message_name: key.message_name.clone(),
2417                                confluent_wire_format: true,
2418                            })),
2419                            value,
2420                        });
2421                    }
2422                    value
2423                } else {
2424                    unreachable!("CSR seed resolution should already have been called: Proto")
2425                }
2426            }
2427            ProtobufSchema::InlineSchema {
2428                message_name,
2429                schema: ast::Schema { schema },
2430            } => {
2431                let descriptors = strconv::parse_bytes(schema)?;
2432
2433                DataEncoding::Protobuf(ProtobufEncoding {
2434                    descriptors,
2435                    message_name: message_name.to_owned(),
2436                    confluent_wire_format: false,
2437                })
2438            }
2439        },
2440        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441            regex: mz_repr::adt::regex::Regex::new(regex, false)
2442                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443        }),
2444        Format::Csv { columns, delimiter } => {
2445            let columns = match columns {
2446                CsvColumns::Header { names } => {
2447                    if names.is_empty() {
2448                        sql_bail!("[internal error] column spec should get names in purify")
2449                    }
2450                    ColumnSpec::Header {
2451                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452                    }
2453                }
2454                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455            };
2456            DataEncoding::Csv(CsvEncoding {
2457                columns,
2458                delimiter: u8::try_from(*delimiter)
2459                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460            })
2461        }
2462        Format::Json { array: false } => DataEncoding::Json,
2463        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464        Format::Text => DataEncoding::Text,
2465    };
2466    Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469/// Extract the key envelope, if it is requested
2470fn get_key_envelope(
2471    included_items: &[SourceIncludeMetadata],
2472    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473    key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475    let key_definition = included_items
2476        .iter()
2477        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482            (Some(name), _) if key_envelope_no_encoding => {
2483                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484            }
2485            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486            (_, None) => {
2487                // `kd.alias` == `None` means `INCLUDE KEY`
2488                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2489                // These both make sense with the same error message
2490                sql_bail!(
2491                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492                        got bare FORMAT"
2493                );
2494            }
2495        }
2496    } else {
2497        Ok(KeyEnvelope::None)
2498    }
2499}
2500
2501/// Gets the key envelope for a given key encoding when no name for the key has
2502/// been requested by the user.
2503fn get_unnamed_key_envelope(
2504    key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506    // If the key is requested but comes from an unnamed type then it gets the name "key"
2507    //
2508    // Otherwise it gets the names of the columns in the type
2509    let is_composite = match key {
2510        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511        Some(
2512            DataEncoding::Avro(_)
2513            | DataEncoding::Csv(_)
2514            | DataEncoding::Protobuf(_)
2515            | DataEncoding::Regex { .. },
2516        ) => true,
2517        None => false,
2518    };
2519
2520    if is_composite {
2521        Ok(KeyEnvelope::Flattened)
2522    } else {
2523        Ok(KeyEnvelope::Named("key".to_string()))
2524    }
2525}
2526
2527pub fn describe_create_view(
2528    _: &StatementContext,
2529    _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531    Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535    scx: &StatementContext,
2536    def: &mut ViewDefinition<Aug>,
2537    temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539    let create_sql = normalize::create_statement(
2540        scx,
2541        Statement::CreateView(CreateViewStatement {
2542            if_exists: IfExistsBehavior::Error,
2543            temporary,
2544            definition: def.clone(),
2545        }),
2546    )?;
2547
2548    let ViewDefinition {
2549        name,
2550        columns,
2551        query,
2552    } = def;
2553
2554    let query::PlannedRootQuery {
2555        expr,
2556        mut desc,
2557        finishing,
2558        scope: _,
2559    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2561    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2562    // here to help with database-issues#236. However, in the meantime, there might be a better
2563    // approach to solve database-issues#236:
2564    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2565    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566        &finishing,
2567        expr.arity()
2568    ));
2569    if expr.contains_parameters()? {
2570        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571    }
2572
2573    let dependencies = expr
2574        .depends_on()
2575        .into_iter()
2576        .map(|gid| scx.catalog.resolve_item_id(&gid))
2577        .collect();
2578
2579    let name = if temporary {
2580        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581    } else {
2582        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583    };
2584
2585    plan_utils::maybe_rename_columns(
2586        format!("view {}", scx.catalog.resolve_full_name(&name)),
2587        &mut desc,
2588        columns,
2589    )?;
2590    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592    if let Some(dup) = names.iter().duplicates().next() {
2593        sql_bail!("column {} specified more than once", dup.quoted());
2594    }
2595
2596    let view = View {
2597        create_sql,
2598        expr,
2599        dependencies,
2600        column_names: names,
2601        temporary,
2602    };
2603
2604    Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608    scx: &StatementContext,
2609    mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611    let CreateViewStatement {
2612        temporary,
2613        if_exists,
2614        definition,
2615    } = &mut stmt;
2616    let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618    // Override the statement-level IfExistsBehavior with Skip if this is
2619    // explicitly requested in the PlanContext (the default is `false`).
2620    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623        let if_exists = true;
2624        let cascade = false;
2625        let maybe_item_to_drop = plan_drop_item(
2626            scx,
2627            ObjectType::View,
2628            if_exists,
2629            definition.name.clone(),
2630            cascade,
2631        )?;
2632
2633        // Check if the new View depends on the item that we would be replacing.
2634        if let Some(id) = maybe_item_to_drop {
2635            let dependencies = view.expr.depends_on();
2636            let invalid_drop = scx
2637                .get_item(&id)
2638                .global_ids()
2639                .any(|gid| dependencies.contains(&gid));
2640            if invalid_drop {
2641                let item = scx.catalog.get_item(&id);
2642                sql_bail!(
2643                    "cannot replace view {0}: depended upon by new {0} definition",
2644                    scx.catalog.resolve_full_name(item.name())
2645                );
2646            }
2647
2648            Some(id)
2649        } else {
2650            None
2651        }
2652    } else {
2653        None
2654    };
2655    let drop_ids = replace
2656        .map(|id| {
2657            scx.catalog
2658                .item_dependents(id)
2659                .into_iter()
2660                .map(|id| id.unwrap_item_id())
2661                .collect()
2662        })
2663        .unwrap_or_default();
2664
2665    // Check for an object in the catalog with this same name
2666    let full_name = scx.catalog.resolve_full_name(&name);
2667    let partial_name = PartialItemName::from(full_name.clone());
2668    // For PostgreSQL compatibility, we need to prevent creating views when
2669    // there is an existing object *or* type of the same name.
2670    if let (Ok(item), IfExistsBehavior::Error, false) = (
2671        scx.catalog.resolve_item_or_type(&partial_name),
2672        *if_exists,
2673        ignore_if_exists_errors,
2674    ) {
2675        return Err(PlanError::ItemAlreadyExists {
2676            name: full_name.to_string(),
2677            item_type: item.item_type(),
2678        });
2679    }
2680
2681    Ok(Plan::CreateView(CreateViewPlan {
2682        name,
2683        view,
2684        replace,
2685        drop_ids,
2686        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2687        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2688    }))
2689}
2690
2691pub fn describe_create_materialized_view(
2692    _: &StatementContext,
2693    _: CreateMaterializedViewStatement<Aug>,
2694) -> Result<StatementDesc, PlanError> {
2695    Ok(StatementDesc::new(None))
2696}
2697
2698pub fn describe_create_continual_task(
2699    _: &StatementContext,
2700    _: CreateContinualTaskStatement<Aug>,
2701) -> Result<StatementDesc, PlanError> {
2702    Ok(StatementDesc::new(None))
2703}
2704
2705pub fn describe_create_network_policy(
2706    _: &StatementContext,
2707    _: CreateNetworkPolicyStatement<Aug>,
2708) -> Result<StatementDesc, PlanError> {
2709    Ok(StatementDesc::new(None))
2710}
2711
2712pub fn describe_alter_network_policy(
2713    _: &StatementContext,
2714    _: AlterNetworkPolicyStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716    Ok(StatementDesc::new(None))
2717}
2718
2719pub fn plan_create_materialized_view(
2720    scx: &StatementContext,
2721    mut stmt: CreateMaterializedViewStatement<Aug>,
2722) -> Result<Plan, PlanError> {
2723    let cluster_id =
2724        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2725    stmt.in_cluster = Some(ResolvedClusterName {
2726        id: cluster_id,
2727        print_name: None,
2728    });
2729
2730    let create_sql =
2731        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2732
2733    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2734    let name = scx.allocate_qualified_name(partial_name.clone())?;
2735
2736    // Validate the replacement target, if one is given.
2737    let replacement_target = if let Some(target_name) = &stmt.replacing {
2738        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2739
2740        let target = scx.get_item_by_resolved_name(target_name)?;
2741        if target.item_type() != CatalogItemType::MaterializedView {
2742            return Err(PlanError::InvalidReplacement {
2743                item_type: target.item_type(),
2744                item_name: scx.catalog.minimal_qualification(target.name()),
2745                replacement_type: CatalogItemType::MaterializedView,
2746                replacement_name: partial_name,
2747            });
2748        }
2749        if target.id().is_system() {
2750            sql_bail!(
2751                "cannot replace {} because it is required by the database system",
2752                scx.catalog.minimal_qualification(target.name()),
2753            );
2754        }
2755        Some(target.id())
2756    } else {
2757        None
2758    };
2759
2760    let query::PlannedRootQuery {
2761        expr,
2762        mut desc,
2763        finishing,
2764        scope: _,
2765    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2766    // We get back a trivial finishing, see comment in `plan_view`.
2767    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2768        &finishing,
2769        expr.arity()
2770    ));
2771    if expr.contains_parameters()? {
2772        return Err(PlanError::ParameterNotAllowed(
2773            "materialized views".to_string(),
2774        ));
2775    }
2776
2777    plan_utils::maybe_rename_columns(
2778        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2779        &mut desc,
2780        &stmt.columns,
2781    )?;
2782    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2783
2784    let MaterializedViewOptionExtracted {
2785        assert_not_null,
2786        partition_by,
2787        retain_history,
2788        refresh,
2789        seen: _,
2790    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2791
2792    if let Some(partition_by) = partition_by {
2793        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2794        check_partition_by(&desc, partition_by)?;
2795    }
2796
2797    let refresh_schedule = {
2798        let mut refresh_schedule = RefreshSchedule::default();
2799        let mut on_commits_seen = 0;
2800        for refresh_option_value in refresh {
2801            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2802                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2803            }
2804            match refresh_option_value {
2805                RefreshOptionValue::OnCommit => {
2806                    on_commits_seen += 1;
2807                }
2808                RefreshOptionValue::AtCreation => {
2809                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2810                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2811                }
2812                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2813                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2814                    let ecx = &ExprContext {
2815                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2816                        name: "REFRESH AT",
2817                        scope: &Scope::empty(),
2818                        relation_type: &SqlRelationType::empty(),
2819                        allow_aggregates: false,
2820                        allow_subqueries: false,
2821                        allow_parameters: false,
2822                        allow_windows: false,
2823                    };
2824                    let hir = plan_expr(ecx, &time)?.cast_to(
2825                        ecx,
2826                        CastContext::Assignment,
2827                        &SqlScalarType::MzTimestamp,
2828                    )?;
2829                    // (mz_now was purified away to a literal earlier)
2830                    let timestamp = hir
2831                        .into_literal_mz_timestamp()
2832                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2833                    refresh_schedule.ats.push(timestamp);
2834                }
2835                RefreshOptionValue::Every(RefreshEveryOptionValue {
2836                    interval,
2837                    aligned_to,
2838                }) => {
2839                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2840                    if interval.as_microseconds() <= 0 {
2841                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2842                    }
2843                    if interval.months != 0 {
2844                        // This limitation is because we want Intervals to be cleanly convertable
2845                        // to a unix epoch timestamp difference. When the interval involves months, then
2846                        // this is not true anymore, because months have variable lengths.
2847                        // See `Timestamp::round_up`.
2848                        sql_bail!("REFRESH interval must not involve units larger than days");
2849                    }
2850                    let interval = interval.duration()?;
2851                    if u64::try_from(interval.as_millis()).is_err() {
2852                        sql_bail!("REFRESH interval too large");
2853                    }
2854
2855                    let mut aligned_to = match aligned_to {
2856                        Some(aligned_to) => aligned_to,
2857                        None => {
2858                            soft_panic_or_log!(
2859                                "ALIGNED TO should have been filled in by purification"
2860                            );
2861                            sql_bail!(
2862                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2863                            )
2864                        }
2865                    };
2866
2867                    // Desugar the `aligned_to` expression
2868                    transform_ast::transform(scx, &mut aligned_to)?;
2869
2870                    let ecx = &ExprContext {
2871                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2872                        name: "REFRESH EVERY ... ALIGNED TO",
2873                        scope: &Scope::empty(),
2874                        relation_type: &SqlRelationType::empty(),
2875                        allow_aggregates: false,
2876                        allow_subqueries: false,
2877                        allow_parameters: false,
2878                        allow_windows: false,
2879                    };
2880                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2881                        ecx,
2882                        CastContext::Assignment,
2883                        &SqlScalarType::MzTimestamp,
2884                    )?;
2885                    // (mz_now was purified away to a literal earlier)
2886                    let aligned_to_const = aligned_to_hir
2887                        .into_literal_mz_timestamp()
2888                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2889
2890                    refresh_schedule.everies.push(RefreshEvery {
2891                        interval,
2892                        aligned_to: aligned_to_const,
2893                    });
2894                }
2895            }
2896        }
2897
2898        if on_commits_seen > 1 {
2899            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2900        }
2901        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2902            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2903        }
2904
2905        if refresh_schedule == RefreshSchedule::default() {
2906            None
2907        } else {
2908            Some(refresh_schedule)
2909        }
2910    };
2911
2912    let as_of = stmt.as_of.map(Timestamp::from);
2913    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2914    let mut non_null_assertions = assert_not_null
2915        .into_iter()
2916        .map(normalize::column_name)
2917        .map(|assertion_name| {
2918            column_names
2919                .iter()
2920                .position(|col| col == &assertion_name)
2921                .ok_or_else(|| {
2922                    sql_err!(
2923                        "column {} in ASSERT NOT NULL option not found",
2924                        assertion_name.quoted()
2925                    )
2926                })
2927        })
2928        .collect::<Result<Vec<_>, _>>()?;
2929    non_null_assertions.sort();
2930    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2931        let dup = &column_names[*dup];
2932        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2933    }
2934
2935    if let Some(dup) = column_names.iter().duplicates().next() {
2936        sql_bail!("column {} specified more than once", dup.quoted());
2937    }
2938
2939    // Override the statement-level IfExistsBehavior with Skip if this is
2940    // explicitly requested in the PlanContext (the default is `false`).
2941    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2942        Ok(true) => IfExistsBehavior::Skip,
2943        _ => stmt.if_exists,
2944    };
2945
2946    let mut replace = None;
2947    let mut if_not_exists = false;
2948    match if_exists {
2949        IfExistsBehavior::Replace => {
2950            let if_exists = true;
2951            let cascade = false;
2952            let replace_id = plan_drop_item(
2953                scx,
2954                ObjectType::MaterializedView,
2955                if_exists,
2956                partial_name.into(),
2957                cascade,
2958            )?;
2959
2960            // Check if the new Materialized View depends on the item that we would be replacing.
2961            if let Some(id) = replace_id {
2962                let dependencies = expr.depends_on();
2963                let invalid_drop = scx
2964                    .get_item(&id)
2965                    .global_ids()
2966                    .any(|gid| dependencies.contains(&gid));
2967                if invalid_drop {
2968                    let item = scx.catalog.get_item(&id);
2969                    sql_bail!(
2970                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2971                        scx.catalog.resolve_full_name(item.name())
2972                    );
2973                }
2974                replace = Some(id);
2975            }
2976        }
2977        IfExistsBehavior::Skip => if_not_exists = true,
2978        IfExistsBehavior::Error => (),
2979    }
2980    let drop_ids = replace
2981        .map(|id| {
2982            scx.catalog
2983                .item_dependents(id)
2984                .into_iter()
2985                .map(|id| id.unwrap_item_id())
2986                .collect()
2987        })
2988        .unwrap_or_default();
2989    let mut dependencies: BTreeSet<_> = expr
2990        .depends_on()
2991        .into_iter()
2992        .map(|gid| scx.catalog.resolve_item_id(&gid))
2993        .collect();
2994
2995    if let Some(id) = replacement_target {
2996        dependencies.insert(id);
2997    }
2998
2999    // Check for an object in the catalog with this same name
3000    let full_name = scx.catalog.resolve_full_name(&name);
3001    let partial_name = PartialItemName::from(full_name.clone());
3002    // For PostgreSQL compatibility, we need to prevent creating materialized
3003    // views when there is an existing object *or* type of the same name.
3004    if let (IfExistsBehavior::Error, Ok(item)) =
3005        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3006    {
3007        return Err(PlanError::ItemAlreadyExists {
3008            name: full_name.to_string(),
3009            item_type: item.item_type(),
3010        });
3011    }
3012
3013    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3014        name,
3015        materialized_view: MaterializedView {
3016            create_sql,
3017            expr,
3018            dependencies: DependencyIds(dependencies),
3019            column_names,
3020            replacement_target,
3021            cluster_id,
3022            non_null_assertions,
3023            compaction_window,
3024            refresh_schedule,
3025            as_of,
3026        },
3027        replace,
3028        drop_ids,
3029        if_not_exists,
3030        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3031    }))
3032}
3033
3034generate_extracted_config!(
3035    MaterializedViewOption,
3036    (AssertNotNull, Ident, AllowMultiple),
3037    (PartitionBy, Vec<Ident>),
3038    (RetainHistory, OptionalDuration),
3039    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3040);
3041
3042pub fn plan_create_continual_task(
3043    scx: &StatementContext,
3044    mut stmt: CreateContinualTaskStatement<Aug>,
3045) -> Result<Plan, PlanError> {
3046    match &stmt.sugar {
3047        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3048        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3049            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3050        }
3051        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3052            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3053        }
3054    };
3055    let cluster_id = match &stmt.in_cluster {
3056        None => scx.catalog.resolve_cluster(None)?.id(),
3057        Some(in_cluster) => in_cluster.id,
3058    };
3059    stmt.in_cluster = Some(ResolvedClusterName {
3060        id: cluster_id,
3061        print_name: None,
3062    });
3063
3064    let create_sql =
3065        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3066
3067    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3068
3069    // It seems desirable for a CT that e.g. simply filters the input to keep
3070    // the same nullability. So, start by assuming all columns are non-nullable,
3071    // and then make them nullable below if any of the exprs plan them as
3072    // nullable.
3073    let mut desc = match stmt.columns {
3074        None => None,
3075        Some(columns) => {
3076            let mut desc_columns = Vec::with_capacity(columns.capacity());
3077            for col in columns.iter() {
3078                desc_columns.push((
3079                    normalize::column_name(col.name.clone()),
3080                    SqlColumnType {
3081                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3082                        nullable: false,
3083                    },
3084                ));
3085            }
3086            Some(RelationDesc::from_names_and_types(desc_columns))
3087        }
3088    };
3089    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3090    match input.item_type() {
3091        // Input must be a thing directly backed by a persist shard, so we can
3092        // use a persist listen to efficiently rehydrate.
3093        CatalogItemType::ContinualTask
3094        | CatalogItemType::Table
3095        | CatalogItemType::MaterializedView
3096        | CatalogItemType::Source => {}
3097        CatalogItemType::Sink
3098        | CatalogItemType::View
3099        | CatalogItemType::Index
3100        | CatalogItemType::Type
3101        | CatalogItemType::Func
3102        | CatalogItemType::Secret
3103        | CatalogItemType::Connection => {
3104            sql_bail!(
3105                "CONTINUAL TASK cannot use {} as an input",
3106                input.item_type()
3107            );
3108        }
3109    }
3110
3111    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3112    let ct_name = stmt.name;
3113    let placeholder_id = match &ct_name {
3114        ResolvedItemName::ContinualTask { id, name } => {
3115            let desc = match desc.as_ref().cloned() {
3116                Some(x) => x,
3117                None => {
3118                    // The user didn't specify the CT's columns. Take a wild
3119                    // guess that the CT has the same shape as the input. It's
3120                    // fine if this is wrong, we'll get an error below after
3121                    // planning the query.
3122                    let input_name = scx.catalog.resolve_full_name(input.name());
3123                    input.desc(&input_name)?.into_owned()
3124                }
3125            };
3126            qcx.ctes.insert(
3127                *id,
3128                CteDesc {
3129                    name: name.item.clone(),
3130                    desc,
3131                },
3132            );
3133            Some(*id)
3134        }
3135        _ => None,
3136    };
3137
3138    let mut exprs = Vec::new();
3139    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3140        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3141        let query::PlannedRootQuery {
3142            expr,
3143            desc: desc_query,
3144            finishing,
3145            scope: _,
3146        } = query::plan_ct_query(&mut qcx, query)?;
3147        // We get back a trivial finishing because we plan with a "maintained"
3148        // QueryLifetime, see comment in `plan_view`.
3149        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3150            &finishing,
3151            expr.arity()
3152        ));
3153        if expr.contains_parameters()? {
3154            if expr.contains_parameters()? {
3155                return Err(PlanError::ParameterNotAllowed(
3156                    "continual tasks".to_string(),
3157                ));
3158            }
3159        }
3160        let expr = match desc.as_mut() {
3161            None => {
3162                desc = Some(desc_query);
3163                expr
3164            }
3165            Some(desc) => {
3166                // We specify the columns for DELETE, so if any columns types don't
3167                // match, it's because it's an INSERT.
3168                if desc_query.arity() > desc.arity() {
3169                    sql_bail!(
3170                        "statement {}: INSERT has more expressions than target columns",
3171                        idx
3172                    );
3173                }
3174                if desc_query.arity() < desc.arity() {
3175                    sql_bail!(
3176                        "statement {}: INSERT has more target columns than expressions",
3177                        idx
3178                    );
3179                }
3180                // Ensure the types of the source query match the types of the target table,
3181                // installing assignment casts where necessary and possible.
3182                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3183                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3184                let expr = expr.map_err(|e| {
3185                    sql_err!(
3186                        "statement {}: column {} is of type {} but expression is of type {}",
3187                        idx,
3188                        desc.get_name(e.column).quoted(),
3189                        qcx.humanize_scalar_type(&e.target_type, false),
3190                        qcx.humanize_scalar_type(&e.source_type, false),
3191                    )
3192                })?;
3193
3194                // Update ct nullability as necessary. The `ne` above verified that the
3195                // types are the same len.
3196                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3197                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3198                if updated {
3199                    let new_types = zip_types().map(|(ct, q)| {
3200                        let mut ct = ct.clone();
3201                        if q.nullable {
3202                            ct.nullable = true;
3203                        }
3204                        ct
3205                    });
3206                    *desc = RelationDesc::from_names_and_types(
3207                        desc.iter_names().cloned().zip_eq(new_types),
3208                    );
3209                }
3210
3211                expr
3212            }
3213        };
3214        match stmt {
3215            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3216            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3217        }
3218    }
3219    // TODO(ct3): Collect things by output and assert that there is only one (or
3220    // support multiple outputs).
3221    let expr = exprs
3222        .into_iter()
3223        .reduce(|acc, expr| acc.union(expr))
3224        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3225    let dependencies = expr
3226        .depends_on()
3227        .into_iter()
3228        .map(|gid| scx.catalog.resolve_item_id(&gid))
3229        .collect();
3230
3231    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3232    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3233    if let Some(dup) = column_names.iter().duplicates().next() {
3234        sql_bail!("column {} specified more than once", dup.quoted());
3235    }
3236
3237    // Check for an object in the catalog with this same name
3238    let name = match &ct_name {
3239        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3240        ResolvedItemName::ContinualTask { name, .. } => {
3241            let name = scx.allocate_qualified_name(name.clone())?;
3242            let full_name = scx.catalog.resolve_full_name(&name);
3243            let partial_name = PartialItemName::from(full_name.clone());
3244            // For PostgreSQL compatibility, we need to prevent creating this when there
3245            // is an existing object *or* type of the same name.
3246            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3247                return Err(PlanError::ItemAlreadyExists {
3248                    name: full_name.to_string(),
3249                    item_type: item.item_type(),
3250                });
3251            }
3252            name
3253        }
3254        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3255        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3256    };
3257
3258    let as_of = stmt.as_of.map(Timestamp::from);
3259    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3260        name,
3261        placeholder_id,
3262        desc,
3263        input_id: input.global_id(),
3264        with_snapshot: snapshot.unwrap_or(true),
3265        continual_task: MaterializedView {
3266            create_sql,
3267            expr,
3268            dependencies,
3269            column_names,
3270            replacement_target: None,
3271            cluster_id,
3272            non_null_assertions: Vec::new(),
3273            compaction_window: None,
3274            refresh_schedule: None,
3275            as_of,
3276        },
3277    }))
3278}
3279
3280fn continual_task_query<'a>(
3281    ct_name: &ResolvedItemName,
3282    stmt: &'a ast::ContinualTaskStmt<Aug>,
3283) -> Option<ast::Query<Aug>> {
3284    match stmt {
3285        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3286            table_name: _,
3287            columns,
3288            source,
3289            returning,
3290        }) => {
3291            if !columns.is_empty() || !returning.is_empty() {
3292                return None;
3293            }
3294            match source {
3295                ast::InsertSource::Query(query) => Some(query.clone()),
3296                ast::InsertSource::DefaultValues => None,
3297            }
3298        }
3299        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3300            table_name: _,
3301            alias,
3302            using,
3303            selection,
3304        }) => {
3305            if !using.is_empty() {
3306                return None;
3307            }
3308            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3309            let from = ast::TableWithJoins {
3310                relation: ast::TableFactor::Table {
3311                    name: ct_name.clone(),
3312                    alias: alias.clone(),
3313                },
3314                joins: Vec::new(),
3315            };
3316            let select = ast::Select {
3317                from: vec![from],
3318                selection: selection.clone(),
3319                distinct: None,
3320                projection: vec![ast::SelectItem::Wildcard],
3321                group_by: Vec::new(),
3322                having: None,
3323                qualify: None,
3324                options: Vec::new(),
3325            };
3326            let query = ast::Query {
3327                ctes: ast::CteBlock::Simple(Vec::new()),
3328                body: ast::SetExpr::Select(Box::new(select)),
3329                order_by: Vec::new(),
3330                limit: None,
3331                offset: None,
3332            };
3333            // Then negate it to turn it into retractions (after planning it).
3334            Some(query)
3335        }
3336    }
3337}
3338
3339generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3340
3341pub fn describe_create_sink(
3342    _: &StatementContext,
3343    _: CreateSinkStatement<Aug>,
3344) -> Result<StatementDesc, PlanError> {
3345    Ok(StatementDesc::new(None))
3346}
3347
3348generate_extracted_config!(
3349    CreateSinkOption,
3350    (Snapshot, bool),
3351    (PartitionStrategy, String),
3352    (Version, u64),
3353    (CommitInterval, Duration)
3354);
3355
3356pub fn plan_create_sink(
3357    scx: &StatementContext,
3358    stmt: CreateSinkStatement<Aug>,
3359) -> Result<Plan, PlanError> {
3360    // Check for an object in the catalog with this same name
3361    let Some(name) = stmt.name.clone() else {
3362        return Err(PlanError::MissingName(CatalogItemType::Sink));
3363    };
3364    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3365    let full_name = scx.catalog.resolve_full_name(&name);
3366    let partial_name = PartialItemName::from(full_name.clone());
3367    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3368        return Err(PlanError::ItemAlreadyExists {
3369            name: full_name.to_string(),
3370            item_type: item.item_type(),
3371        });
3372    }
3373
3374    plan_sink(scx, stmt)
3375}
3376
3377/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3378/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3379/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3380/// important.
3381fn plan_sink(
3382    scx: &StatementContext,
3383    mut stmt: CreateSinkStatement<Aug>,
3384) -> Result<Plan, PlanError> {
3385    let CreateSinkStatement {
3386        name,
3387        in_cluster: _,
3388        from,
3389        connection,
3390        format,
3391        envelope,
3392        if_not_exists,
3393        with_options,
3394    } = stmt.clone();
3395
3396    let Some(name) = name else {
3397        return Err(PlanError::MissingName(CatalogItemType::Sink));
3398    };
3399    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3400
3401    let envelope = match envelope {
3402        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3403        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3404        None => sql_bail!("ENVELOPE clause is required"),
3405    };
3406
3407    let from_name = &from;
3408    let from = scx.get_item_by_resolved_name(&from)?;
3409
3410    {
3411        use CatalogItemType::*;
3412        match from.item_type() {
3413            Table | Source | MaterializedView | ContinualTask => {}
3414            Sink | View | Index | Type | Func | Secret | Connection => {
3415                let name = scx.catalog.minimal_qualification(from.name());
3416                return Err(PlanError::InvalidSinkFrom {
3417                    name: name.to_string(),
3418                    item_type: from.item_type(),
3419                });
3420            }
3421        }
3422    }
3423
3424    if from.id().is_system() {
3425        bail_unsupported!("creating a sink directly on a catalog object");
3426    }
3427    let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3428    let key_indices = match &connection {
3429        CreateSinkConnection::Kafka { key: Some(key), .. }
3430        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3431            let key_columns = key
3432                .key_columns
3433                .clone()
3434                .into_iter()
3435                .map(normalize::column_name)
3436                .collect::<Vec<_>>();
3437            let mut uniq = BTreeSet::new();
3438            for col in key_columns.iter() {
3439                if !uniq.insert(col) {
3440                    sql_bail!("duplicate column referenced in KEY: {}", col);
3441                }
3442            }
3443            let indices = key_columns
3444                .iter()
3445                .map(|col| -> anyhow::Result<usize> {
3446                    let name_idx =
3447                        desc.get_by_name(col)
3448                            .map(|(idx, _type)| idx)
3449                            .ok_or_else(|| {
3450                                sql_err!("column referenced in KEY does not exist: {}", col)
3451                            })?;
3452                    if desc.get_unambiguous_name(name_idx).is_none() {
3453                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3454                    }
3455                    Ok(name_idx)
3456                })
3457                .collect::<Result<Vec<_>, _>>()?;
3458            let is_valid_key = desc
3459                .typ()
3460                .keys
3461                .iter()
3462                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3463
3464            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3465                if key.not_enforced {
3466                    scx.catalog
3467                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3468                            key: key_columns.clone(),
3469                            name: name.item.clone(),
3470                        })
3471                } else {
3472                    return Err(PlanError::UpsertSinkWithInvalidKey {
3473                        name: from_name.full_name_str(),
3474                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3475                        valid_keys: desc
3476                            .typ()
3477                            .keys
3478                            .iter()
3479                            .map(|key| {
3480                                key.iter()
3481                                    .map(|col| desc.get_name(*col).as_str().into())
3482                                    .collect()
3483                            })
3484                            .collect(),
3485                    });
3486                }
3487            }
3488            Some(indices)
3489        }
3490        CreateSinkConnection::Kafka { key: None, .. }
3491        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3492    };
3493
3494    let headers_index = match &connection {
3495        CreateSinkConnection::Kafka {
3496            headers: Some(headers),
3497            ..
3498        } => {
3499            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3500
3501            match envelope {
3502                SinkEnvelope::Upsert => (),
3503                SinkEnvelope::Debezium => {
3504                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3505                }
3506            };
3507
3508            let headers = normalize::column_name(headers.clone());
3509            let (idx, ty) = desc
3510                .get_by_name(&headers)
3511                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3512
3513            if desc.get_unambiguous_name(idx).is_none() {
3514                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3515            }
3516
3517            match &ty.scalar_type {
3518                SqlScalarType::Map { value_type, .. }
3519                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3520                _ => sql_bail!(
3521                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3522                ),
3523            }
3524
3525            Some(idx)
3526        }
3527        _ => None,
3528    };
3529
3530    // pick the first valid natural relation key, if any
3531    let relation_key_indices = desc.typ().keys.get(0).cloned();
3532
3533    let key_desc_and_indices = key_indices.map(|key_indices| {
3534        let cols = desc
3535            .iter()
3536            .map(|(name, ty)| (name.clone(), ty.clone()))
3537            .collect::<Vec<_>>();
3538        let (names, types): (Vec<_>, Vec<_>) =
3539            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3540        let typ = SqlRelationType::new(types);
3541        (RelationDesc::new(typ, names), key_indices)
3542    });
3543
3544    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3545        return Err(PlanError::UpsertSinkWithoutKey);
3546    }
3547
3548    let connection_builder = match connection {
3549        CreateSinkConnection::Kafka {
3550            connection,
3551            options,
3552            ..
3553        } => kafka_sink_builder(
3554            scx,
3555            connection,
3556            options,
3557            format,
3558            relation_key_indices,
3559            key_desc_and_indices,
3560            headers_index,
3561            desc.into_owned(),
3562            envelope,
3563            from.id(),
3564        )?,
3565        CreateSinkConnection::Iceberg {
3566            connection,
3567            aws_connection,
3568            options,
3569            ..
3570        } => iceberg_sink_builder(
3571            scx,
3572            connection,
3573            aws_connection,
3574            options,
3575            relation_key_indices,
3576            key_desc_and_indices,
3577        )?,
3578    };
3579
3580    let CreateSinkOptionExtracted {
3581        snapshot,
3582        version,
3583        partition_strategy: _,
3584        seen: _,
3585        commit_interval: _,
3586    } = with_options.try_into()?;
3587
3588    // WITH SNAPSHOT defaults to true
3589    let with_snapshot = snapshot.unwrap_or(true);
3590    // VERSION defaults to 0
3591    let version = version.unwrap_or(0);
3592
3593    // We will rewrite the cluster if one is not provided, so we must use the
3594    // `in_cluster` value we plan to normalize when we canonicalize the create
3595    // statement.
3596    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3597    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3598
3599    Ok(Plan::CreateSink(CreateSinkPlan {
3600        name,
3601        sink: Sink {
3602            create_sql,
3603            from: from.global_id(),
3604            connection: connection_builder,
3605            envelope,
3606            version,
3607        },
3608        with_snapshot,
3609        if_not_exists,
3610        in_cluster: in_cluster.id(),
3611    }))
3612}
3613
3614fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3615    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3616
3617    let existing_keys = desc
3618        .typ()
3619        .keys
3620        .iter()
3621        .map(|key_columns| {
3622            key_columns
3623                .iter()
3624                .map(|col| desc.get_name(*col).as_str())
3625                .join(", ")
3626        })
3627        .join(", ");
3628
3629    sql_err!(
3630        "Key constraint ({}) conflicts with existing key ({})",
3631        user_keys,
3632        existing_keys
3633    )
3634}
3635
3636/// Creating this by hand instead of using generate_extracted_config! macro
3637/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3638#[derive(Debug, Default, PartialEq, Clone)]
3639pub struct CsrConfigOptionExtracted {
3640    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3641    pub(crate) avro_key_fullname: Option<String>,
3642    pub(crate) avro_value_fullname: Option<String>,
3643    pub(crate) null_defaults: bool,
3644    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3645    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3646    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3647    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3648}
3649
3650impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3651    type Error = crate::plan::PlanError;
3652    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3653        let mut extracted = CsrConfigOptionExtracted::default();
3654        let mut common_doc_comments = BTreeMap::new();
3655        for option in v {
3656            if !extracted.seen.insert(option.name.clone()) {
3657                return Err(PlanError::Unstructured({
3658                    format!("{} specified more than once", option.name)
3659                }));
3660            }
3661            let option_name = option.name.clone();
3662            let option_name_str = option_name.to_ast_string_simple();
3663            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3664                option_name: option_name.to_ast_string_simple(),
3665                err: e.into(),
3666            };
3667            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3668                val.map(|s| match s {
3669                    WithOptionValue::Value(Value::String(s)) => {
3670                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3671                    }
3672                    _ => Err("must be a string".to_string()),
3673                })
3674                .transpose()
3675                .map_err(PlanError::Unstructured)
3676                .map_err(better_error)
3677            };
3678            match option.name {
3679                CsrConfigOptionName::AvroKeyFullname => {
3680                    extracted.avro_key_fullname =
3681                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3682                }
3683                CsrConfigOptionName::AvroValueFullname => {
3684                    extracted.avro_value_fullname =
3685                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3686                }
3687                CsrConfigOptionName::NullDefaults => {
3688                    extracted.null_defaults =
3689                        <bool>::try_from_value(option.value).map_err(better_error)?;
3690                }
3691                CsrConfigOptionName::AvroDocOn(doc_on) => {
3692                    let value = String::try_from_value(option.value.ok_or_else(|| {
3693                        PlanError::InvalidOptionValue {
3694                            option_name: option_name_str,
3695                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3696                        }
3697                    })?)
3698                    .map_err(better_error)?;
3699                    let key = match doc_on.identifier {
3700                        DocOnIdentifier::Column(ast::ColumnName {
3701                            relation: ResolvedItemName::Item { id, .. },
3702                            column: ResolvedColumnReference::Column { name, index: _ },
3703                        }) => DocTarget::Field {
3704                            object_id: id,
3705                            column_name: name,
3706                        },
3707                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3708                            DocTarget::Type(id)
3709                        }
3710                        _ => unreachable!(),
3711                    };
3712
3713                    match doc_on.for_schema {
3714                        DocOnSchema::KeyOnly => {
3715                            extracted.key_doc_options.insert(key, value);
3716                        }
3717                        DocOnSchema::ValueOnly => {
3718                            extracted.value_doc_options.insert(key, value);
3719                        }
3720                        DocOnSchema::All => {
3721                            common_doc_comments.insert(key, value);
3722                        }
3723                    }
3724                }
3725                CsrConfigOptionName::KeyCompatibilityLevel => {
3726                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3727                }
3728                CsrConfigOptionName::ValueCompatibilityLevel => {
3729                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3730                }
3731            }
3732        }
3733
3734        for (key, value) in common_doc_comments {
3735            if !extracted.key_doc_options.contains_key(&key) {
3736                extracted.key_doc_options.insert(key.clone(), value.clone());
3737            }
3738            if !extracted.value_doc_options.contains_key(&key) {
3739                extracted.value_doc_options.insert(key, value);
3740            }
3741        }
3742        Ok(extracted)
3743    }
3744}
3745
3746fn iceberg_sink_builder(
3747    scx: &StatementContext,
3748    catalog_connection: ResolvedItemName,
3749    aws_connection: ResolvedItemName,
3750    options: Vec<IcebergSinkConfigOption<Aug>>,
3751    relation_key_indices: Option<Vec<usize>>,
3752    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3753) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3754    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3755    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3756    let catalog_connection_id = catalog_connection_item.id();
3757    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3758    let aws_connection_id = aws_connection_item.id();
3759    if !matches!(
3760        catalog_connection_item.connection()?,
3761        Connection::IcebergCatalog(_)
3762    ) {
3763        sql_bail!(
3764            "{} is not an iceberg catalog connection",
3765            scx.catalog
3766                .resolve_full_name(catalog_connection_item.name())
3767                .to_string()
3768                .quoted()
3769        );
3770    };
3771
3772    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3773        sql_bail!(
3774            "{} is not an AWS connection",
3775            scx.catalog
3776                .resolve_full_name(aws_connection_item.name())
3777                .to_string()
3778                .quoted()
3779        );
3780    }
3781
3782    let IcebergSinkConfigOptionExtracted {
3783        table,
3784        namespace,
3785        seen: _,
3786    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3787
3788    let Some(table) = table else {
3789        sql_bail!("Iceberg sink must specify TABLE");
3790    };
3791    let Some(namespace) = namespace else {
3792        sql_bail!("Iceberg sink must specify NAMESPACE");
3793    };
3794
3795    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3796        catalog_connection_id,
3797        catalog_connection: catalog_connection_id,
3798        aws_connection_id,
3799        aws_connection: aws_connection_id,
3800        table,
3801        namespace,
3802        relation_key_indices,
3803        key_desc_and_indices,
3804    }))
3805}
3806
3807fn kafka_sink_builder(
3808    scx: &StatementContext,
3809    connection: ResolvedItemName,
3810    options: Vec<KafkaSinkConfigOption<Aug>>,
3811    format: Option<FormatSpecifier<Aug>>,
3812    relation_key_indices: Option<Vec<usize>>,
3813    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3814    headers_index: Option<usize>,
3815    value_desc: RelationDesc,
3816    envelope: SinkEnvelope,
3817    sink_from: CatalogItemId,
3818) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3819    // Get Kafka connection.
3820    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3821    let connection_id = connection_item.id();
3822    match connection_item.connection()? {
3823        Connection::Kafka(_) => (),
3824        _ => sql_bail!(
3825            "{} is not a kafka connection",
3826            scx.catalog.resolve_full_name(connection_item.name())
3827        ),
3828    };
3829
3830    let KafkaSinkConfigOptionExtracted {
3831        topic,
3832        compression_type,
3833        partition_by,
3834        progress_group_id_prefix,
3835        transactional_id_prefix,
3836        legacy_ids,
3837        topic_config,
3838        topic_metadata_refresh_interval,
3839        topic_partition_count,
3840        topic_replication_factor,
3841        seen: _,
3842    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3843
3844    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3845        (Some(_), Some(true)) => {
3846            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3847        }
3848        (None, Some(true)) => KafkaIdStyle::Legacy,
3849        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3850    };
3851
3852    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3853        (Some(_), Some(true)) => {
3854            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3855        }
3856        (None, Some(true)) => KafkaIdStyle::Legacy,
3857        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3858    };
3859
3860    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3861
3862    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3863        // This is a librdkafka-enforced restriction that, if violated,
3864        // would result in a runtime error for the source.
3865        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3866    }
3867
3868    let assert_positive = |val: Option<i32>, name: &str| {
3869        if let Some(val) = val {
3870            if val <= 0 {
3871                sql_bail!("{} must be a positive integer", name);
3872            }
3873        }
3874        val.map(NonNeg::try_from)
3875            .transpose()
3876            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3877    };
3878    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3879    let topic_replication_factor =
3880        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3881
3882    // Helper method to parse avro connection options for format specifiers that use avro
3883    // for either key or value encoding.
3884    let gen_avro_schema_options = |conn| {
3885        let CsrConnectionAvro {
3886            connection:
3887                CsrConnection {
3888                    connection,
3889                    options,
3890                },
3891            seed,
3892            key_strategy,
3893            value_strategy,
3894        } = conn;
3895        if seed.is_some() {
3896            sql_bail!("SEED option does not make sense with sinks");
3897        }
3898        if key_strategy.is_some() {
3899            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3900        }
3901        if value_strategy.is_some() {
3902            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3903        }
3904
3905        let item = scx.get_item_by_resolved_name(&connection)?;
3906        let csr_connection = match item.connection()? {
3907            Connection::Csr(_) => item.id(),
3908            _ => {
3909                sql_bail!(
3910                    "{} is not a schema registry connection",
3911                    scx.catalog
3912                        .resolve_full_name(item.name())
3913                        .to_string()
3914                        .quoted()
3915                )
3916            }
3917        };
3918        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3919
3920        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3921            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3922        }
3923
3924        if key_desc_and_indices.is_some()
3925            && (extracted_options.avro_key_fullname.is_some()
3926                ^ extracted_options.avro_value_fullname.is_some())
3927        {
3928            sql_bail!(
3929                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3930            );
3931        }
3932
3933        Ok((csr_connection, extracted_options))
3934    };
3935
3936    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3937        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3938        Format::Bytes if desc.arity() == 1 => {
3939            let col_type = &desc.typ().column_types[0].scalar_type;
3940            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3941                bail_unsupported!(format!(
3942                    "BYTES format with non-encodable type: {:?}",
3943                    col_type
3944                ));
3945            }
3946
3947            Ok(KafkaSinkFormatType::Bytes)
3948        }
3949        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3950        Format::Bytes | Format::Text => {
3951            bail_unsupported!("BYTES or TEXT format with multiple columns")
3952        }
3953        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3954        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3955            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3956            let schema = if is_key {
3957                AvroSchemaGenerator::new(
3958                    desc.clone(),
3959                    false,
3960                    options.key_doc_options,
3961                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3962                    options.null_defaults,
3963                    Some(sink_from),
3964                    false,
3965                )?
3966                .schema()
3967                .to_string()
3968            } else {
3969                AvroSchemaGenerator::new(
3970                    desc.clone(),
3971                    matches!(envelope, SinkEnvelope::Debezium),
3972                    options.value_doc_options,
3973                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3974                    options.null_defaults,
3975                    Some(sink_from),
3976                    true,
3977                )?
3978                .schema()
3979                .to_string()
3980            };
3981            Ok(KafkaSinkFormatType::Avro {
3982                schema,
3983                compatibility_level: if is_key {
3984                    options.key_compatibility_level
3985                } else {
3986                    options.value_compatibility_level
3987                },
3988                csr_connection,
3989            })
3990        }
3991        format => bail_unsupported!(format!("sink format {:?}", format)),
3992    };
3993
3994    let partition_by = match &partition_by {
3995        Some(partition_by) => {
3996            let mut scope = Scope::from_source(None, value_desc.iter_names());
3997
3998            match envelope {
3999                SinkEnvelope::Upsert => (),
4000                SinkEnvelope::Debezium => {
4001                    let key_indices: HashSet<_> = key_desc_and_indices
4002                        .as_ref()
4003                        .map(|(_desc, indices)| indices.as_slice())
4004                        .unwrap_or_default()
4005                        .into_iter()
4006                        .collect();
4007                    for (i, item) in scope.items.iter_mut().enumerate() {
4008                        if !key_indices.contains(&i) {
4009                            item.error_if_referenced = Some(|_table, column| {
4010                                PlanError::InvalidPartitionByEnvelopeDebezium {
4011                                    column_name: column.to_string(),
4012                                }
4013                            });
4014                        }
4015                    }
4016                }
4017            };
4018
4019            let ecx = &ExprContext {
4020                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4021                name: "PARTITION BY",
4022                scope: &scope,
4023                relation_type: value_desc.typ(),
4024                allow_aggregates: false,
4025                allow_subqueries: false,
4026                allow_parameters: false,
4027                allow_windows: false,
4028            };
4029            let expr = plan_expr(ecx, partition_by)?.cast_to(
4030                ecx,
4031                CastContext::Assignment,
4032                &SqlScalarType::UInt64,
4033            )?;
4034            let expr = expr.lower_uncorrelated()?;
4035
4036            Some(expr)
4037        }
4038        _ => None,
4039    };
4040
4041    // Map from the format specifier of the statement to the individual key/value formats for the sink.
4042    let format = match format {
4043        Some(FormatSpecifier::KeyValue { key, value }) => {
4044            let key_format = match key_desc_and_indices.as_ref() {
4045                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4046                None => None,
4047            };
4048            KafkaSinkFormat {
4049                value_format: map_format(value, &value_desc, false)?,
4050                key_format,
4051            }
4052        }
4053        Some(FormatSpecifier::Bare(format)) => {
4054            let key_format = match key_desc_and_indices.as_ref() {
4055                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4056                None => None,
4057            };
4058            KafkaSinkFormat {
4059                value_format: map_format(format, &value_desc, false)?,
4060                key_format,
4061            }
4062        }
4063        None => bail_unsupported!("sink without format"),
4064    };
4065
4066    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4067        connection_id,
4068        connection: connection_id,
4069        format,
4070        topic: topic_name,
4071        relation_key_indices,
4072        key_desc_and_indices,
4073        headers_index,
4074        value_desc,
4075        partition_by,
4076        compression_type,
4077        progress_group_id,
4078        transactional_id,
4079        topic_options: KafkaTopicOptions {
4080            partition_count: topic_partition_count,
4081            replication_factor: topic_replication_factor,
4082            topic_config: topic_config.unwrap_or_default(),
4083        },
4084        topic_metadata_refresh_interval,
4085    }))
4086}
4087
4088pub fn describe_create_index(
4089    _: &StatementContext,
4090    _: CreateIndexStatement<Aug>,
4091) -> Result<StatementDesc, PlanError> {
4092    Ok(StatementDesc::new(None))
4093}
4094
4095pub fn plan_create_index(
4096    scx: &StatementContext,
4097    mut stmt: CreateIndexStatement<Aug>,
4098) -> Result<Plan, PlanError> {
4099    let CreateIndexStatement {
4100        name,
4101        on_name,
4102        in_cluster,
4103        key_parts,
4104        with_options,
4105        if_not_exists,
4106    } = &mut stmt;
4107    let on = scx.get_item_by_resolved_name(on_name)?;
4108    let on_item_type = on.item_type();
4109
4110    if !matches!(
4111        on_item_type,
4112        CatalogItemType::View
4113            | CatalogItemType::MaterializedView
4114            | CatalogItemType::Source
4115            | CatalogItemType::Table
4116    ) {
4117        sql_bail!(
4118            "index cannot be created on {} because it is a {}",
4119            on_name.full_name_str(),
4120            on.item_type()
4121        )
4122    }
4123
4124    let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
4125
4126    let filled_key_parts = match key_parts {
4127        Some(kp) => kp.to_vec(),
4128        None => {
4129            // `key_parts` is None if we're creating a "default" index.
4130            on.desc(&scx.catalog.resolve_full_name(on.name()))?
4131                .typ()
4132                .default_key()
4133                .iter()
4134                .map(|i| match on_desc.get_unambiguous_name(*i) {
4135                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4136                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4137                })
4138                .collect()
4139        }
4140    };
4141    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4142
4143    let index_name = if let Some(name) = name {
4144        QualifiedItemName {
4145            qualifiers: on.name().qualifiers.clone(),
4146            item: normalize::ident(name.clone()),
4147        }
4148    } else {
4149        let mut idx_name = QualifiedItemName {
4150            qualifiers: on.name().qualifiers.clone(),
4151            item: on.name().item.clone(),
4152        };
4153        if key_parts.is_none() {
4154            // We're trying to create the "default" index.
4155            idx_name.item += "_primary_idx";
4156        } else {
4157            // Use PG schema for automatically naming indexes:
4158            // `<table>_<_-separated indexed expressions>_idx`
4159            let index_name_col_suffix = keys
4160                .iter()
4161                .map(|k| match k {
4162                    mz_expr::MirScalarExpr::Column(i, name) => {
4163                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4164                            (Some(col_name), _) => col_name.to_string(),
4165                            (None, Some(name)) => name.to_string(),
4166                            (None, None) => format!("{}", i + 1),
4167                        }
4168                    }
4169                    _ => "expr".to_string(),
4170                })
4171                .join("_");
4172            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4173                .expect("write on strings cannot fail");
4174            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4175        }
4176
4177        if !*if_not_exists {
4178            scx.catalog.find_available_name(idx_name)
4179        } else {
4180            idx_name
4181        }
4182    };
4183
4184    // Check for an object in the catalog with this same name
4185    let full_name = scx.catalog.resolve_full_name(&index_name);
4186    let partial_name = PartialItemName::from(full_name.clone());
4187    // For PostgreSQL compatibility, we need to prevent creating indexes when
4188    // there is an existing object *or* type of the same name.
4189    //
4190    // Technically, we only need to prevent coexistence of indexes and types
4191    // that have an associated relation (record types but not list/map types).
4192    // Enforcing that would be more complicated, though. It's backwards
4193    // compatible to weaken this restriction in the future.
4194    if let (Ok(item), false, false) = (
4195        scx.catalog.resolve_item_or_type(&partial_name),
4196        *if_not_exists,
4197        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4198    ) {
4199        return Err(PlanError::ItemAlreadyExists {
4200            name: full_name.to_string(),
4201            item_type: item.item_type(),
4202        });
4203    }
4204
4205    let options = plan_index_options(scx, with_options.clone())?;
4206    let cluster_id = match in_cluster {
4207        None => scx.resolve_cluster(None)?.id(),
4208        Some(in_cluster) => in_cluster.id,
4209    };
4210
4211    *in_cluster = Some(ResolvedClusterName {
4212        id: cluster_id,
4213        print_name: None,
4214    });
4215
4216    // Normalize `stmt`.
4217    *name = Some(Ident::new(index_name.item.clone())?);
4218    *key_parts = Some(filled_key_parts);
4219    let if_not_exists = *if_not_exists;
4220
4221    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4222    let compaction_window = options.iter().find_map(|o| {
4223        #[allow(irrefutable_let_patterns)]
4224        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4225            Some(lcw.clone())
4226        } else {
4227            None
4228        }
4229    });
4230
4231    Ok(Plan::CreateIndex(CreateIndexPlan {
4232        name: index_name,
4233        index: Index {
4234            create_sql,
4235            on: on.global_id(),
4236            keys,
4237            cluster_id,
4238            compaction_window,
4239        },
4240        if_not_exists,
4241    }))
4242}
4243
4244pub fn describe_create_type(
4245    _: &StatementContext,
4246    _: CreateTypeStatement<Aug>,
4247) -> Result<StatementDesc, PlanError> {
4248    Ok(StatementDesc::new(None))
4249}
4250
4251pub fn plan_create_type(
4252    scx: &StatementContext,
4253    stmt: CreateTypeStatement<Aug>,
4254) -> Result<Plan, PlanError> {
4255    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4256    let CreateTypeStatement { name, as_type, .. } = stmt;
4257
4258    fn validate_data_type(
4259        scx: &StatementContext,
4260        data_type: ResolvedDataType,
4261        as_type: &str,
4262        key: &str,
4263    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4264        let (id, modifiers) = match data_type {
4265            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4266            _ => sql_bail!(
4267                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4268                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4269                as_type,
4270                key,
4271                data_type.human_readable_name(),
4272            ),
4273        };
4274
4275        let item = scx.catalog.get_item(&id);
4276        match item.type_details() {
4277            None => sql_bail!(
4278                "{} must be of class type, but received {} which is of class {}",
4279                key,
4280                scx.catalog.resolve_full_name(item.name()),
4281                item.item_type()
4282            ),
4283            Some(CatalogTypeDetails {
4284                typ: CatalogType::Char,
4285                ..
4286            }) => {
4287                bail_unsupported!("embedding char type in a list or map")
4288            }
4289            _ => {
4290                // Validate that the modifiers are actually valid.
4291                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4292
4293                Ok((id, modifiers))
4294            }
4295        }
4296    }
4297
4298    let inner = match as_type {
4299        CreateTypeAs::List { options } => {
4300            let CreateTypeListOptionExtracted {
4301                element_type,
4302                seen: _,
4303            } = CreateTypeListOptionExtracted::try_from(options)?;
4304            let element_type =
4305                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4306            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4307            CatalogType::List {
4308                element_reference: id,
4309                element_modifiers: modifiers,
4310            }
4311        }
4312        CreateTypeAs::Map { options } => {
4313            let CreateTypeMapOptionExtracted {
4314                key_type,
4315                value_type,
4316                seen: _,
4317            } = CreateTypeMapOptionExtracted::try_from(options)?;
4318            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4319            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4320            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4321            let (value_id, value_modifiers) =
4322                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4323            CatalogType::Map {
4324                key_reference: key_id,
4325                key_modifiers,
4326                value_reference: value_id,
4327                value_modifiers,
4328            }
4329        }
4330        CreateTypeAs::Record { column_defs } => {
4331            let mut fields = vec![];
4332            for column_def in column_defs {
4333                let data_type = column_def.data_type;
4334                let key = ident(column_def.name.clone());
4335                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4336                fields.push(CatalogRecordField {
4337                    name: ColumnName::from(key.clone()),
4338                    type_reference: id,
4339                    type_modifiers: modifiers,
4340                });
4341            }
4342            CatalogType::Record { fields }
4343        }
4344    };
4345
4346    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4347
4348    // Check for an object in the catalog with this same name
4349    let full_name = scx.catalog.resolve_full_name(&name);
4350    let partial_name = PartialItemName::from(full_name.clone());
4351    // For PostgreSQL compatibility, we need to prevent creating types when
4352    // there is an existing object *or* type of the same name.
4353    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4354        if item.item_type().conflicts_with_type() {
4355            return Err(PlanError::ItemAlreadyExists {
4356                name: full_name.to_string(),
4357                item_type: item.item_type(),
4358            });
4359        }
4360    }
4361
4362    Ok(Plan::CreateType(CreateTypePlan {
4363        name,
4364        typ: Type { create_sql, inner },
4365    }))
4366}
4367
4368generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4369
4370generate_extracted_config!(
4371    CreateTypeMapOption,
4372    (KeyType, ResolvedDataType),
4373    (ValueType, ResolvedDataType)
4374);
4375
4376#[derive(Debug)]
4377pub enum PlannedAlterRoleOption {
4378    Attributes(PlannedRoleAttributes),
4379    Variable(PlannedRoleVariable),
4380}
4381
4382#[derive(Debug, Clone)]
4383pub struct PlannedRoleAttributes {
4384    pub inherit: Option<bool>,
4385    pub password: Option<Password>,
4386    pub scram_iterations: Option<NonZeroU32>,
4387    /// `nopassword` is set to true if the password is from the parser is None.
4388    /// This is semantically different than not supplying a password at all,
4389    /// to allow for unsetting a password.
4390    pub nopassword: Option<bool>,
4391    pub superuser: Option<bool>,
4392    pub login: Option<bool>,
4393}
4394
4395fn plan_role_attributes(
4396    options: Vec<RoleAttribute>,
4397    scx: &StatementContext,
4398) -> Result<PlannedRoleAttributes, PlanError> {
4399    let mut planned_attributes = PlannedRoleAttributes {
4400        inherit: None,
4401        password: None,
4402        scram_iterations: None,
4403        superuser: None,
4404        login: None,
4405        nopassword: None,
4406    };
4407
4408    for option in options {
4409        match option {
4410            RoleAttribute::Inherit | RoleAttribute::NoInherit
4411                if planned_attributes.inherit.is_some() =>
4412            {
4413                sql_bail!("conflicting or redundant options");
4414            }
4415            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4416                bail_never_supported!(
4417                    "CREATECLUSTER attribute",
4418                    "sql/create-role/#details",
4419                    "Use system privileges instead."
4420                );
4421            }
4422            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4423                bail_never_supported!(
4424                    "CREATEDB attribute",
4425                    "sql/create-role/#details",
4426                    "Use system privileges instead."
4427                );
4428            }
4429            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4430                bail_never_supported!(
4431                    "CREATEROLE attribute",
4432                    "sql/create-role/#details",
4433                    "Use system privileges instead."
4434                );
4435            }
4436            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4437                sql_bail!("conflicting or redundant options");
4438            }
4439
4440            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4441            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4442            RoleAttribute::Password(password) => {
4443                if let Some(password) = password {
4444                    planned_attributes.password = Some(password.into());
4445                    planned_attributes.scram_iterations =
4446                        Some(scx.catalog.system_vars().scram_iterations())
4447                } else {
4448                    planned_attributes.nopassword = Some(true);
4449                }
4450            }
4451            RoleAttribute::SuperUser => {
4452                if planned_attributes.superuser == Some(false) {
4453                    sql_bail!("conflicting or redundant options");
4454                }
4455                planned_attributes.superuser = Some(true);
4456            }
4457            RoleAttribute::NoSuperUser => {
4458                if planned_attributes.superuser == Some(true) {
4459                    sql_bail!("conflicting or redundant options");
4460                }
4461                planned_attributes.superuser = Some(false);
4462            }
4463            RoleAttribute::Login => {
4464                if planned_attributes.login == Some(false) {
4465                    sql_bail!("conflicting or redundant options");
4466                }
4467                planned_attributes.login = Some(true);
4468            }
4469            RoleAttribute::NoLogin => {
4470                if planned_attributes.login == Some(true) {
4471                    sql_bail!("conflicting or redundant options");
4472                }
4473                planned_attributes.login = Some(false);
4474            }
4475        }
4476    }
4477    if planned_attributes.inherit == Some(false) {
4478        bail_unsupported!("non inherit roles");
4479    }
4480
4481    Ok(planned_attributes)
4482}
4483
4484#[derive(Debug)]
4485pub enum PlannedRoleVariable {
4486    Set { name: String, value: VariableValue },
4487    Reset { name: String },
4488}
4489
4490impl PlannedRoleVariable {
4491    pub fn name(&self) -> &str {
4492        match self {
4493            PlannedRoleVariable::Set { name, .. } => name,
4494            PlannedRoleVariable::Reset { name } => name,
4495        }
4496    }
4497}
4498
4499fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4500    let plan = match variable {
4501        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4502            name: name.to_string(),
4503            value: scl::plan_set_variable_to(value)?,
4504        },
4505        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4506            name: name.to_string(),
4507        },
4508    };
4509    Ok(plan)
4510}
4511
4512pub fn describe_create_role(
4513    _: &StatementContext,
4514    _: CreateRoleStatement,
4515) -> Result<StatementDesc, PlanError> {
4516    Ok(StatementDesc::new(None))
4517}
4518
4519pub fn plan_create_role(
4520    scx: &StatementContext,
4521    CreateRoleStatement { name, options }: CreateRoleStatement,
4522) -> Result<Plan, PlanError> {
4523    let attributes = plan_role_attributes(options, scx)?;
4524    Ok(Plan::CreateRole(CreateRolePlan {
4525        name: normalize::ident(name),
4526        attributes: attributes.into(),
4527    }))
4528}
4529
4530pub fn plan_create_network_policy(
4531    ctx: &StatementContext,
4532    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4533) -> Result<Plan, PlanError> {
4534    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4535    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4536
4537    let Some(rule_defs) = policy_options.rules else {
4538        sql_bail!("RULES must be specified when creating network policies.");
4539    };
4540
4541    let mut rules = vec![];
4542    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4543        let NetworkPolicyRuleOptionExtracted {
4544            seen: _,
4545            direction,
4546            action,
4547            address,
4548        } = options.try_into()?;
4549        let (direction, action, address) = match (direction, action, address) {
4550            (Some(direction), Some(action), Some(address)) => (
4551                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4552                NetworkPolicyRuleAction::try_from(action.as_str())?,
4553                PolicyAddress::try_from(address.as_str())?,
4554            ),
4555            (_, _, _) => {
4556                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4557            }
4558        };
4559        rules.push(NetworkPolicyRule {
4560            name: normalize::ident(name),
4561            direction,
4562            action,
4563            address,
4564        });
4565    }
4566
4567    if rules.len()
4568        > ctx
4569            .catalog
4570            .system_vars()
4571            .max_rules_per_network_policy()
4572            .try_into()?
4573    {
4574        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4575    }
4576
4577    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4578        name: normalize::ident(name),
4579        rules,
4580    }))
4581}
4582
4583pub fn plan_alter_network_policy(
4584    ctx: &StatementContext,
4585    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4586) -> Result<Plan, PlanError> {
4587    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4588
4589    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4590    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4591
4592    let Some(rule_defs) = policy_options.rules else {
4593        sql_bail!("RULES must be specified when creating network policies.");
4594    };
4595
4596    let mut rules = vec![];
4597    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4598        let NetworkPolicyRuleOptionExtracted {
4599            seen: _,
4600            direction,
4601            action,
4602            address,
4603        } = options.try_into()?;
4604
4605        let (direction, action, address) = match (direction, action, address) {
4606            (Some(direction), Some(action), Some(address)) => (
4607                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4608                NetworkPolicyRuleAction::try_from(action.as_str())?,
4609                PolicyAddress::try_from(address.as_str())?,
4610            ),
4611            (_, _, _) => {
4612                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4613            }
4614        };
4615        rules.push(NetworkPolicyRule {
4616            name: normalize::ident(name),
4617            direction,
4618            action,
4619            address,
4620        });
4621    }
4622    if rules.len()
4623        > ctx
4624            .catalog
4625            .system_vars()
4626            .max_rules_per_network_policy()
4627            .try_into()?
4628    {
4629        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4630    }
4631
4632    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4633        id: policy.id(),
4634        name: normalize::ident(name),
4635        rules,
4636    }))
4637}
4638
4639pub fn describe_create_cluster(
4640    _: &StatementContext,
4641    _: CreateClusterStatement<Aug>,
4642) -> Result<StatementDesc, PlanError> {
4643    Ok(StatementDesc::new(None))
4644}
4645
4646// WARNING:
4647// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4648// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4649// that option stays the same. If you were to give a default value here, then not giving that option
4650// to ALTER CLUSTER would always reset the value of that option to the default.
4651generate_extracted_config!(
4652    ClusterOption,
4653    (AvailabilityZones, Vec<String>),
4654    (Disk, bool),
4655    (IntrospectionDebugging, bool),
4656    (IntrospectionInterval, OptionalDuration),
4657    (Managed, bool),
4658    (Replicas, Vec<ReplicaDefinition<Aug>>),
4659    (ReplicationFactor, u32),
4660    (Size, String),
4661    (Schedule, ClusterScheduleOptionValue),
4662    (WorkloadClass, OptionalString)
4663);
4664
4665generate_extracted_config!(
4666    NetworkPolicyOption,
4667    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4668);
4669
4670generate_extracted_config!(
4671    NetworkPolicyRuleOption,
4672    (Direction, String),
4673    (Action, String),
4674    (Address, String)
4675);
4676
4677generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4678
4679generate_extracted_config!(
4680    ClusterAlterUntilReadyOption,
4681    (Timeout, Duration),
4682    (OnTimeout, String)
4683);
4684
4685generate_extracted_config!(
4686    ClusterFeature,
4687    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4688    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4689    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4690    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4691    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4692    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4693    (
4694        EnableProjectionPushdownAfterRelationCse,
4695        Option<bool>,
4696        Default(None)
4697    )
4698);
4699
4700/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4701///
4702/// The reverse of [`unplan_create_cluster`].
4703pub fn plan_create_cluster(
4704    scx: &StatementContext,
4705    stmt: CreateClusterStatement<Aug>,
4706) -> Result<Plan, PlanError> {
4707    let plan = plan_create_cluster_inner(scx, stmt)?;
4708
4709    // Roundtrip through unplan and make sure that we end up with the same plan.
4710    if let CreateClusterVariant::Managed(_) = &plan.variant {
4711        let stmt = unplan_create_cluster(scx, plan.clone())
4712            .map_err(|e| PlanError::Replan(e.to_string()))?;
4713        let create_sql = stmt.to_ast_string_stable();
4714        let stmt = parse::parse(&create_sql)
4715            .map_err(|e| PlanError::Replan(e.to_string()))?
4716            .into_element()
4717            .ast;
4718        let (stmt, _resolved_ids) =
4719            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4720        let stmt = match stmt {
4721            Statement::CreateCluster(stmt) => stmt,
4722            stmt => {
4723                return Err(PlanError::Replan(format!(
4724                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4725                )));
4726            }
4727        };
4728        let replan =
4729            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4730        if plan != replan {
4731            return Err(PlanError::Replan(format!(
4732                "replan does not match: plan={plan:?}, replan={replan:?}"
4733            )));
4734        }
4735    }
4736
4737    Ok(Plan::CreateCluster(plan))
4738}
4739
4740pub fn plan_create_cluster_inner(
4741    scx: &StatementContext,
4742    CreateClusterStatement {
4743        name,
4744        options,
4745        features,
4746    }: CreateClusterStatement<Aug>,
4747) -> Result<CreateClusterPlan, PlanError> {
4748    let ClusterOptionExtracted {
4749        availability_zones,
4750        introspection_debugging,
4751        introspection_interval,
4752        managed,
4753        replicas,
4754        replication_factor,
4755        seen: _,
4756        size,
4757        disk,
4758        schedule,
4759        workload_class,
4760    }: ClusterOptionExtracted = options.try_into()?;
4761
4762    let managed = managed.unwrap_or_else(|| replicas.is_none());
4763
4764    if !scx.catalog.active_role_id().is_system() {
4765        if !features.is_empty() {
4766            sql_bail!("FEATURES not supported for non-system users");
4767        }
4768        if workload_class.is_some() {
4769            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4770        }
4771    }
4772
4773    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4774    let workload_class = workload_class.and_then(|v| v.0);
4775
4776    if managed {
4777        if replicas.is_some() {
4778            sql_bail!("REPLICAS not supported for managed clusters");
4779        }
4780        let Some(size) = size else {
4781            sql_bail!("SIZE must be specified for managed clusters");
4782        };
4783
4784        if disk.is_some() {
4785            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4786            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4787            // we'll be able to remove the `DISK` option entirely.
4788            if scx.catalog.is_cluster_size_cc(&size) {
4789                sql_bail!(
4790                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4791                );
4792            }
4793
4794            scx.catalog
4795                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4796        }
4797
4798        let compute = plan_compute_replica_config(
4799            introspection_interval,
4800            introspection_debugging.unwrap_or(false),
4801        )?;
4802
4803        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4804            replication_factor.unwrap_or_else(|| {
4805                scx.catalog
4806                    .system_vars()
4807                    .default_cluster_replication_factor()
4808            })
4809        } else {
4810            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4811            if replication_factor.is_some() {
4812                sql_bail!(
4813                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4814                );
4815            }
4816            // If we have a non-trivial schedule, then let's not have any replicas initially,
4817            // to avoid quickly going back and forth if the schedule doesn't want a replica
4818            // initially.
4819            0
4820        };
4821        let availability_zones = availability_zones.unwrap_or_default();
4822
4823        if !availability_zones.is_empty() {
4824            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4825        }
4826
4827        // Plan OptimizerFeatureOverrides.
4828        let ClusterFeatureExtracted {
4829            reoptimize_imported_views,
4830            enable_eager_delta_joins,
4831            enable_new_outer_join_lowering,
4832            enable_variadic_left_join_lowering,
4833            enable_letrec_fixpoint_analysis,
4834            enable_join_prioritize_arranged,
4835            enable_projection_pushdown_after_relation_cse,
4836            seen: _,
4837        } = ClusterFeatureExtracted::try_from(features)?;
4838        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4839            reoptimize_imported_views,
4840            enable_eager_delta_joins,
4841            enable_new_outer_join_lowering,
4842            enable_variadic_left_join_lowering,
4843            enable_letrec_fixpoint_analysis,
4844            enable_join_prioritize_arranged,
4845            enable_projection_pushdown_after_relation_cse,
4846            ..Default::default()
4847        };
4848
4849        let schedule = plan_cluster_schedule(schedule)?;
4850
4851        Ok(CreateClusterPlan {
4852            name: normalize::ident(name),
4853            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4854                replication_factor,
4855                size,
4856                availability_zones,
4857                compute,
4858                optimizer_feature_overrides,
4859                schedule,
4860            }),
4861            workload_class,
4862        })
4863    } else {
4864        let Some(replica_defs) = replicas else {
4865            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4866        };
4867        if availability_zones.is_some() {
4868            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4869        }
4870        if replication_factor.is_some() {
4871            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4872        }
4873        if introspection_debugging.is_some() {
4874            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4875        }
4876        if introspection_interval.is_some() {
4877            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4878        }
4879        if size.is_some() {
4880            sql_bail!("SIZE not supported for unmanaged clusters");
4881        }
4882        if disk.is_some() {
4883            sql_bail!("DISK not supported for unmanaged clusters");
4884        }
4885        if !features.is_empty() {
4886            sql_bail!("FEATURES not supported for unmanaged clusters");
4887        }
4888        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4889            sql_bail!(
4890                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4891            );
4892        }
4893
4894        let mut replicas = vec![];
4895        for ReplicaDefinition { name, options } in replica_defs {
4896            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4897        }
4898
4899        Ok(CreateClusterPlan {
4900            name: normalize::ident(name),
4901            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4902            workload_class,
4903        })
4904    }
4905}
4906
4907/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4908///
4909/// The reverse of [`plan_create_cluster`].
4910pub fn unplan_create_cluster(
4911    scx: &StatementContext,
4912    CreateClusterPlan {
4913        name,
4914        variant,
4915        workload_class,
4916    }: CreateClusterPlan,
4917) -> Result<CreateClusterStatement<Aug>, PlanError> {
4918    match variant {
4919        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4920            replication_factor,
4921            size,
4922            availability_zones,
4923            compute,
4924            optimizer_feature_overrides,
4925            schedule,
4926        }) => {
4927            let schedule = unplan_cluster_schedule(schedule);
4928            let OptimizerFeatureOverrides {
4929                enable_guard_subquery_tablefunc: _,
4930                enable_consolidate_after_union_negate: _,
4931                enable_reduce_mfp_fusion: _,
4932                enable_cardinality_estimates: _,
4933                persist_fast_path_limit: _,
4934                reoptimize_imported_views,
4935                enable_eager_delta_joins,
4936                enable_new_outer_join_lowering,
4937                enable_variadic_left_join_lowering,
4938                enable_letrec_fixpoint_analysis,
4939                enable_reduce_reduction: _,
4940                enable_join_prioritize_arranged,
4941                enable_projection_pushdown_after_relation_cse,
4942                enable_less_reduce_in_eqprop: _,
4943                enable_dequadratic_eqprop_map: _,
4944                enable_eq_classes_withholding_errors: _,
4945                enable_fast_path_plan_insights: _,
4946            } = optimizer_feature_overrides;
4947            // The ones from above that don't occur below are not wired up to cluster features.
4948            let features_extracted = ClusterFeatureExtracted {
4949                // Seen is ignored when unplanning.
4950                seen: Default::default(),
4951                reoptimize_imported_views,
4952                enable_eager_delta_joins,
4953                enable_new_outer_join_lowering,
4954                enable_variadic_left_join_lowering,
4955                enable_letrec_fixpoint_analysis,
4956                enable_join_prioritize_arranged,
4957                enable_projection_pushdown_after_relation_cse,
4958            };
4959            let features = features_extracted.into_values(scx.catalog);
4960            let availability_zones = if availability_zones.is_empty() {
4961                None
4962            } else {
4963                Some(availability_zones)
4964            };
4965            let (introspection_interval, introspection_debugging) =
4966                unplan_compute_replica_config(compute);
4967            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4968            // always 1 or less.
4969            let replication_factor = match &schedule {
4970                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4971                ClusterScheduleOptionValue::Refresh { .. } => {
4972                    assert!(
4973                        replication_factor <= 1,
4974                        "replication factor, {replication_factor:?}, must be <= 1"
4975                    );
4976                    None
4977                }
4978            };
4979            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4980            let options_extracted = ClusterOptionExtracted {
4981                // Seen is ignored when unplanning.
4982                seen: Default::default(),
4983                availability_zones,
4984                disk: None,
4985                introspection_debugging: Some(introspection_debugging),
4986                introspection_interval,
4987                managed: Some(true),
4988                replicas: None,
4989                replication_factor,
4990                size: Some(size),
4991                schedule: Some(schedule),
4992                workload_class,
4993            };
4994            let options = options_extracted.into_values(scx.catalog);
4995            let name = Ident::new_unchecked(name);
4996            Ok(CreateClusterStatement {
4997                name,
4998                options,
4999                features,
5000            })
5001        }
5002        CreateClusterVariant::Unmanaged(_) => {
5003            bail_unsupported!("SHOW CREATE for unmanaged clusters")
5004        }
5005    }
5006}
5007
5008generate_extracted_config!(
5009    ReplicaOption,
5010    (AvailabilityZone, String),
5011    (BilledAs, String),
5012    (ComputeAddresses, Vec<String>),
5013    (ComputectlAddresses, Vec<String>),
5014    (Disk, bool),
5015    (Internal, bool, Default(false)),
5016    (IntrospectionDebugging, bool, Default(false)),
5017    (IntrospectionInterval, OptionalDuration),
5018    (Size, String),
5019    (StorageAddresses, Vec<String>),
5020    (StoragectlAddresses, Vec<String>),
5021    (Workers, u16)
5022);
5023
5024fn plan_replica_config(
5025    scx: &StatementContext,
5026    options: Vec<ReplicaOption<Aug>>,
5027) -> Result<ReplicaConfig, PlanError> {
5028    let ReplicaOptionExtracted {
5029        availability_zone,
5030        billed_as,
5031        computectl_addresses,
5032        disk,
5033        internal,
5034        introspection_debugging,
5035        introspection_interval,
5036        size,
5037        storagectl_addresses,
5038        ..
5039    }: ReplicaOptionExtracted = options.try_into()?;
5040
5041    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5042
5043    match (
5044        size,
5045        availability_zone,
5046        billed_as,
5047        storagectl_addresses,
5048        computectl_addresses,
5049    ) {
5050        // Common cases we expect end users to hit.
5051        (None, _, None, None, None) => {
5052            // We don't mention the unmanaged options in the error message
5053            // because they are only available in unsafe mode.
5054            sql_bail!("SIZE option must be specified");
5055        }
5056        (Some(size), availability_zone, billed_as, None, None) => {
5057            if disk.is_some() {
5058                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5059                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5060                // we'll be able to remove the `DISK` option entirely.
5061                if scx.catalog.is_cluster_size_cc(&size) {
5062                    sql_bail!(
5063                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5064                    );
5065                }
5066
5067                scx.catalog
5068                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5069            }
5070
5071            Ok(ReplicaConfig::Orchestrated {
5072                size,
5073                availability_zone,
5074                compute,
5075                billed_as,
5076                internal,
5077            })
5078        }
5079
5080        (None, None, None, storagectl_addresses, computectl_addresses) => {
5081            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5082
5083            // When manually testing Materialize in unsafe mode, it's easy to
5084            // accidentally omit one of these options, so we try to produce
5085            // helpful error messages.
5086            let Some(storagectl_addrs) = storagectl_addresses else {
5087                sql_bail!("missing STORAGECTL ADDRESSES option");
5088            };
5089            let Some(computectl_addrs) = computectl_addresses else {
5090                sql_bail!("missing COMPUTECTL ADDRESSES option");
5091            };
5092
5093            if storagectl_addrs.len() != computectl_addrs.len() {
5094                sql_bail!(
5095                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5096                );
5097            }
5098
5099            if disk.is_some() {
5100                sql_bail!("DISK can't be specified for unorchestrated clusters");
5101            }
5102
5103            Ok(ReplicaConfig::Unorchestrated {
5104                storagectl_addrs,
5105                computectl_addrs,
5106                compute,
5107            })
5108        }
5109        _ => {
5110            // We don't bother trying to produce a more helpful error message
5111            // here because no user is likely to hit this path.
5112            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5113        }
5114    }
5115}
5116
5117/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5118///
5119/// The reverse of [`unplan_compute_replica_config`].
5120fn plan_compute_replica_config(
5121    introspection_interval: Option<OptionalDuration>,
5122    introspection_debugging: bool,
5123) -> Result<ComputeReplicaConfig, PlanError> {
5124    let introspection_interval = introspection_interval
5125        .map(|OptionalDuration(i)| i)
5126        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5127    let introspection = match introspection_interval {
5128        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5129            interval,
5130            debugging: introspection_debugging,
5131        }),
5132        None if introspection_debugging => {
5133            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5134        }
5135        None => None,
5136    };
5137    let compute = ComputeReplicaConfig { introspection };
5138    Ok(compute)
5139}
5140
5141/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5142///
5143/// The reverse of [`plan_compute_replica_config`].
5144fn unplan_compute_replica_config(
5145    compute_replica_config: ComputeReplicaConfig,
5146) -> (Option<OptionalDuration>, bool) {
5147    match compute_replica_config.introspection {
5148        Some(ComputeReplicaIntrospectionConfig {
5149            debugging,
5150            interval,
5151        }) => (Some(OptionalDuration(Some(interval))), debugging),
5152        None => (Some(OptionalDuration(None)), false),
5153    }
5154}
5155
5156/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5157///
5158/// The reverse of [`unplan_cluster_schedule`].
5159fn plan_cluster_schedule(
5160    schedule: ClusterScheduleOptionValue,
5161) -> Result<ClusterSchedule, PlanError> {
5162    Ok(match schedule {
5163        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5164        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5165        ClusterScheduleOptionValue::Refresh {
5166            hydration_time_estimate: None,
5167        } => ClusterSchedule::Refresh {
5168            hydration_time_estimate: Duration::from_millis(0),
5169        },
5170        // Otherwise we convert the `IntervalValue` to a `Duration`.
5171        ClusterScheduleOptionValue::Refresh {
5172            hydration_time_estimate: Some(interval_value),
5173        } => {
5174            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5175            if interval.as_microseconds() < 0 {
5176                sql_bail!(
5177                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5178                    interval
5179                );
5180            }
5181            if interval.months != 0 {
5182                // This limitation is because we want this interval to be cleanly convertable
5183                // to a unix epoch timestamp difference. When the interval involves months, then
5184                // this is not true anymore, because months have variable lengths.
5185                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5186            }
5187            let duration = interval.duration()?;
5188            if u64::try_from(duration.as_millis()).is_err()
5189                || Interval::from_duration(&duration).is_err()
5190            {
5191                sql_bail!("HYDRATION TIME ESTIMATE too large");
5192            }
5193            ClusterSchedule::Refresh {
5194                hydration_time_estimate: duration,
5195            }
5196        }
5197    })
5198}
5199
5200/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5201///
5202/// The reverse of [`plan_cluster_schedule`].
5203fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5204    match schedule {
5205        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5206        ClusterSchedule::Refresh {
5207            hydration_time_estimate,
5208        } => {
5209            let interval = Interval::from_duration(&hydration_time_estimate)
5210                .expect("planning ensured that this is convertible back to Interval");
5211            let interval_value = literal::unplan_interval(&interval);
5212            ClusterScheduleOptionValue::Refresh {
5213                hydration_time_estimate: Some(interval_value),
5214            }
5215        }
5216    }
5217}
5218
5219pub fn describe_create_cluster_replica(
5220    _: &StatementContext,
5221    _: CreateClusterReplicaStatement<Aug>,
5222) -> Result<StatementDesc, PlanError> {
5223    Ok(StatementDesc::new(None))
5224}
5225
5226pub fn plan_create_cluster_replica(
5227    scx: &StatementContext,
5228    CreateClusterReplicaStatement {
5229        definition: ReplicaDefinition { name, options },
5230        of_cluster,
5231    }: CreateClusterReplicaStatement<Aug>,
5232) -> Result<Plan, PlanError> {
5233    let cluster = scx
5234        .catalog
5235        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5236    let current_replica_count = cluster.replica_ids().iter().count();
5237    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5238        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5239        return Err(PlanError::CreateReplicaFailStorageObjects {
5240            current_replica_count,
5241            internal_replica_count,
5242            hypothetical_replica_count: current_replica_count + 1,
5243        });
5244    }
5245
5246    let config = plan_replica_config(scx, options)?;
5247
5248    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5249        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5250            return Err(PlanError::MangedReplicaName(name.into_string()));
5251        }
5252    } else {
5253        ensure_cluster_is_not_managed(scx, cluster.id())?;
5254    }
5255
5256    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5257        name: normalize::ident(name),
5258        cluster_id: cluster.id(),
5259        config,
5260    }))
5261}
5262
5263pub fn describe_create_secret(
5264    _: &StatementContext,
5265    _: CreateSecretStatement<Aug>,
5266) -> Result<StatementDesc, PlanError> {
5267    Ok(StatementDesc::new(None))
5268}
5269
5270pub fn plan_create_secret(
5271    scx: &StatementContext,
5272    stmt: CreateSecretStatement<Aug>,
5273) -> Result<Plan, PlanError> {
5274    let CreateSecretStatement {
5275        name,
5276        if_not_exists,
5277        value,
5278    } = &stmt;
5279
5280    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5281    let mut create_sql_statement = stmt.clone();
5282    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5283    let create_sql =
5284        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5285    let secret_as = query::plan_secret_as(scx, value.clone())?;
5286
5287    let secret = Secret {
5288        create_sql,
5289        secret_as,
5290    };
5291
5292    Ok(Plan::CreateSecret(CreateSecretPlan {
5293        name,
5294        secret,
5295        if_not_exists: *if_not_exists,
5296    }))
5297}
5298
5299pub fn describe_create_connection(
5300    _: &StatementContext,
5301    _: CreateConnectionStatement<Aug>,
5302) -> Result<StatementDesc, PlanError> {
5303    Ok(StatementDesc::new(None))
5304}
5305
5306generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5307
5308pub fn plan_create_connection(
5309    scx: &StatementContext,
5310    mut stmt: CreateConnectionStatement<Aug>,
5311) -> Result<Plan, PlanError> {
5312    let CreateConnectionStatement {
5313        name,
5314        connection_type,
5315        values,
5316        if_not_exists,
5317        with_options,
5318    } = stmt.clone();
5319    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5320    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5321    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5322
5323    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5324    if options.validate.is_some() {
5325        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5326    }
5327    let validate = match options.validate {
5328        Some(val) => val,
5329        None => {
5330            scx.catalog
5331                .system_vars()
5332                .enable_default_connection_validation()
5333                && details.to_connection().validate_by_default()
5334        }
5335    };
5336
5337    // Check for an object in the catalog with this same name
5338    let full_name = scx.catalog.resolve_full_name(&name);
5339    let partial_name = PartialItemName::from(full_name.clone());
5340    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5341        return Err(PlanError::ItemAlreadyExists {
5342            name: full_name.to_string(),
5343            item_type: item.item_type(),
5344        });
5345    }
5346
5347    // For SSH connections, overwrite the public key options based on the
5348    // connection details, in case we generated new keys during planning.
5349    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5350        stmt.values.retain(|v| {
5351            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5352        });
5353        stmt.values.push(ConnectionOption {
5354            name: ConnectionOptionName::PublicKey1,
5355            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5356        });
5357        stmt.values.push(ConnectionOption {
5358            name: ConnectionOptionName::PublicKey2,
5359            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5360        });
5361    }
5362    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5363
5364    let plan = CreateConnectionPlan {
5365        name,
5366        if_not_exists,
5367        connection: crate::plan::Connection {
5368            create_sql,
5369            details,
5370        },
5371        validate,
5372    };
5373    Ok(Plan::CreateConnection(plan))
5374}
5375
5376fn plan_drop_database(
5377    scx: &StatementContext,
5378    if_exists: bool,
5379    name: &UnresolvedDatabaseName,
5380    cascade: bool,
5381) -> Result<Option<DatabaseId>, PlanError> {
5382    Ok(match resolve_database(scx, name, if_exists)? {
5383        Some(database) => {
5384            if !cascade && database.has_schemas() {
5385                sql_bail!(
5386                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5387                    name,
5388                );
5389            }
5390            Some(database.id())
5391        }
5392        None => None,
5393    })
5394}
5395
5396pub fn describe_drop_objects(
5397    _: &StatementContext,
5398    _: DropObjectsStatement,
5399) -> Result<StatementDesc, PlanError> {
5400    Ok(StatementDesc::new(None))
5401}
5402
5403pub fn plan_drop_objects(
5404    scx: &mut StatementContext,
5405    DropObjectsStatement {
5406        object_type,
5407        if_exists,
5408        names,
5409        cascade,
5410    }: DropObjectsStatement,
5411) -> Result<Plan, PlanError> {
5412    assert_ne!(
5413        object_type,
5414        mz_sql_parser::ast::ObjectType::Func,
5415        "rejected in parser"
5416    );
5417    let object_type = object_type.into();
5418
5419    let mut referenced_ids = Vec::new();
5420    for name in names {
5421        let id = match &name {
5422            UnresolvedObjectName::Cluster(name) => {
5423                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5424            }
5425            UnresolvedObjectName::ClusterReplica(name) => {
5426                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5427            }
5428            UnresolvedObjectName::Database(name) => {
5429                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5430            }
5431            UnresolvedObjectName::Schema(name) => {
5432                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5433            }
5434            UnresolvedObjectName::Role(name) => {
5435                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5436            }
5437            UnresolvedObjectName::Item(name) => {
5438                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5439                    .map(ObjectId::Item)
5440            }
5441            UnresolvedObjectName::NetworkPolicy(name) => {
5442                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5443            }
5444        };
5445        match id {
5446            Some(id) => referenced_ids.push(id),
5447            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5448                name: name.to_ast_string_simple(),
5449                object_type,
5450            }),
5451        }
5452    }
5453    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5454
5455    Ok(Plan::DropObjects(DropObjectsPlan {
5456        referenced_ids,
5457        drop_ids,
5458        object_type,
5459    }))
5460}
5461
5462fn plan_drop_schema(
5463    scx: &StatementContext,
5464    if_exists: bool,
5465    name: &UnresolvedSchemaName,
5466    cascade: bool,
5467) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5468    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5469        Some((database_spec, schema_spec)) => {
5470            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5471                sql_bail!(
5472                    "cannot drop schema {name} because it is required by the database system",
5473                );
5474            }
5475            if let SchemaSpecifier::Temporary = schema_spec {
5476                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5477            }
5478            let schema = scx.get_schema(&database_spec, &schema_spec);
5479            if !cascade && schema.has_items() {
5480                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5481                sql_bail!(
5482                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5483                    full_schema_name
5484                );
5485            }
5486            Some((database_spec, schema_spec))
5487        }
5488        None => None,
5489    })
5490}
5491
5492fn plan_drop_role(
5493    scx: &StatementContext,
5494    if_exists: bool,
5495    name: &Ident,
5496) -> Result<Option<RoleId>, PlanError> {
5497    match scx.catalog.resolve_role(name.as_str()) {
5498        Ok(role) => {
5499            let id = role.id();
5500            if &id == scx.catalog.active_role_id() {
5501                sql_bail!("current role cannot be dropped");
5502            }
5503            for role in scx.catalog.get_roles() {
5504                for (member_id, grantor_id) in role.membership() {
5505                    if &id == grantor_id {
5506                        let member_role = scx.catalog.get_role(member_id);
5507                        sql_bail!(
5508                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5509                            name.as_str(),
5510                            role.name(),
5511                            member_role.name()
5512                        );
5513                    }
5514                }
5515            }
5516            Ok(Some(role.id()))
5517        }
5518        Err(_) if if_exists => Ok(None),
5519        Err(e) => Err(e.into()),
5520    }
5521}
5522
5523fn plan_drop_cluster(
5524    scx: &StatementContext,
5525    if_exists: bool,
5526    name: &Ident,
5527    cascade: bool,
5528) -> Result<Option<ClusterId>, PlanError> {
5529    Ok(match resolve_cluster(scx, name, if_exists)? {
5530        Some(cluster) => {
5531            if !cascade && !cluster.bound_objects().is_empty() {
5532                return Err(PlanError::DependentObjectsStillExist {
5533                    object_type: "cluster".to_string(),
5534                    object_name: cluster.name().to_string(),
5535                    dependents: Vec::new(),
5536                });
5537            }
5538            Some(cluster.id())
5539        }
5540        None => None,
5541    })
5542}
5543
5544fn plan_drop_network_policy(
5545    scx: &StatementContext,
5546    if_exists: bool,
5547    name: &Ident,
5548) -> Result<Option<NetworkPolicyId>, PlanError> {
5549    match scx.catalog.resolve_network_policy(name.as_str()) {
5550        Ok(policy) => {
5551            // TODO(network_policy): When we support role based network policies, check if any role
5552            // currently has the specified policy set.
5553            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5554                Err(PlanError::NetworkPolicyInUse)
5555            } else {
5556                Ok(Some(policy.id()))
5557            }
5558        }
5559        Err(_) if if_exists => Ok(None),
5560        Err(e) => Err(e.into()),
5561    }
5562}
5563
5564/// Returns `true` if the cluster has any object that requires a single replica.
5565/// Returns `false` if the cluster has no objects.
5566fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5567    // If this feature is enabled then all objects support multiple-replicas
5568    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5569        false
5570    } else {
5571        // Othewise we check for the existence of sources or sinks
5572        cluster.bound_objects().iter().any(|id| {
5573            let item = scx.catalog.get_item(id);
5574            matches!(
5575                item.item_type(),
5576                CatalogItemType::Sink | CatalogItemType::Source
5577            )
5578        })
5579    }
5580}
5581
5582fn plan_drop_cluster_replica(
5583    scx: &StatementContext,
5584    if_exists: bool,
5585    name: &QualifiedReplica,
5586) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5587    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5588    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5589}
5590
5591/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5592fn plan_drop_item(
5593    scx: &StatementContext,
5594    object_type: ObjectType,
5595    if_exists: bool,
5596    name: UnresolvedItemName,
5597    cascade: bool,
5598) -> Result<Option<CatalogItemId>, PlanError> {
5599    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5600        Ok(r) => r,
5601        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5602        Err(PlanError::MismatchedObjectType {
5603            name,
5604            is_type: ObjectType::MaterializedView,
5605            expected_type: ObjectType::View,
5606        }) => {
5607            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5608        }
5609        e => e?,
5610    };
5611
5612    Ok(match resolved {
5613        Some(catalog_item) => {
5614            if catalog_item.id().is_system() {
5615                sql_bail!(
5616                    "cannot drop {} {} because it is required by the database system",
5617                    catalog_item.item_type(),
5618                    scx.catalog.minimal_qualification(catalog_item.name()),
5619                );
5620            }
5621
5622            if !cascade {
5623                for id in catalog_item.used_by() {
5624                    let dep = scx.catalog.get_item(id);
5625                    if dependency_prevents_drop(object_type, dep) {
5626                        return Err(PlanError::DependentObjectsStillExist {
5627                            object_type: catalog_item.item_type().to_string(),
5628                            object_name: scx
5629                                .catalog
5630                                .minimal_qualification(catalog_item.name())
5631                                .to_string(),
5632                            dependents: vec![(
5633                                dep.item_type().to_string(),
5634                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5635                            )],
5636                        });
5637                    }
5638                }
5639                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5640                //  relies on entry. Unfortunately, we don't have that information readily available.
5641            }
5642            Some(catalog_item.id())
5643        }
5644        None => None,
5645    })
5646}
5647
5648/// Does the dependency `dep` prevent a drop of a non-cascade query?
5649fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5650    match object_type {
5651        ObjectType::Type => true,
5652        _ => match dep.item_type() {
5653            CatalogItemType::Func
5654            | CatalogItemType::Table
5655            | CatalogItemType::Source
5656            | CatalogItemType::View
5657            | CatalogItemType::MaterializedView
5658            | CatalogItemType::Sink
5659            | CatalogItemType::Type
5660            | CatalogItemType::Secret
5661            | CatalogItemType::Connection
5662            | CatalogItemType::ContinualTask => true,
5663            CatalogItemType::Index => false,
5664        },
5665    }
5666}
5667
5668pub fn describe_alter_index_options(
5669    _: &StatementContext,
5670    _: AlterIndexStatement<Aug>,
5671) -> Result<StatementDesc, PlanError> {
5672    Ok(StatementDesc::new(None))
5673}
5674
5675pub fn describe_drop_owned(
5676    _: &StatementContext,
5677    _: DropOwnedStatement<Aug>,
5678) -> Result<StatementDesc, PlanError> {
5679    Ok(StatementDesc::new(None))
5680}
5681
5682pub fn plan_drop_owned(
5683    scx: &StatementContext,
5684    drop: DropOwnedStatement<Aug>,
5685) -> Result<Plan, PlanError> {
5686    let cascade = drop.cascade();
5687    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5688    let mut drop_ids = Vec::new();
5689    let mut privilege_revokes = Vec::new();
5690    let mut default_privilege_revokes = Vec::new();
5691
5692    fn update_privilege_revokes(
5693        object_id: SystemObjectId,
5694        privileges: &PrivilegeMap,
5695        role_ids: &BTreeSet<RoleId>,
5696        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5697    ) {
5698        privilege_revokes.extend(iter::zip(
5699            iter::repeat(object_id),
5700            privileges
5701                .all_values()
5702                .filter(|privilege| role_ids.contains(&privilege.grantee))
5703                .cloned(),
5704        ));
5705    }
5706
5707    // Replicas
5708    for replica in scx.catalog.get_cluster_replicas() {
5709        if role_ids.contains(&replica.owner_id()) {
5710            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5711        }
5712    }
5713
5714    // Clusters
5715    for cluster in scx.catalog.get_clusters() {
5716        if role_ids.contains(&cluster.owner_id()) {
5717            // Note: CASCADE is not required for replicas.
5718            if !cascade {
5719                let non_owned_bound_objects: Vec<_> = cluster
5720                    .bound_objects()
5721                    .into_iter()
5722                    .map(|item_id| scx.catalog.get_item(item_id))
5723                    .filter(|item| !role_ids.contains(&item.owner_id()))
5724                    .collect();
5725                if !non_owned_bound_objects.is_empty() {
5726                    let names: Vec<_> = non_owned_bound_objects
5727                        .into_iter()
5728                        .map(|item| {
5729                            (
5730                                item.item_type().to_string(),
5731                                scx.catalog.resolve_full_name(item.name()).to_string(),
5732                            )
5733                        })
5734                        .collect();
5735                    return Err(PlanError::DependentObjectsStillExist {
5736                        object_type: "cluster".to_string(),
5737                        object_name: cluster.name().to_string(),
5738                        dependents: names,
5739                    });
5740                }
5741            }
5742            drop_ids.push(cluster.id().into());
5743        }
5744        update_privilege_revokes(
5745            SystemObjectId::Object(cluster.id().into()),
5746            cluster.privileges(),
5747            &role_ids,
5748            &mut privilege_revokes,
5749        );
5750    }
5751
5752    // Items
5753    for item in scx.catalog.get_items() {
5754        if role_ids.contains(&item.owner_id()) {
5755            if !cascade {
5756                // Checks if any items still depend on this one, returning an error if so.
5757                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5758                    let non_owned_dependencies: Vec<_> = used_by
5759                        .into_iter()
5760                        .map(|item_id| scx.catalog.get_item(item_id))
5761                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5762                        .filter(|item| !role_ids.contains(&item.owner_id()))
5763                        .collect();
5764                    if !non_owned_dependencies.is_empty() {
5765                        let names: Vec<_> = non_owned_dependencies
5766                            .into_iter()
5767                            .map(|item| {
5768                                let item_typ = item.item_type().to_string();
5769                                let item_name =
5770                                    scx.catalog.resolve_full_name(item.name()).to_string();
5771                                (item_typ, item_name)
5772                            })
5773                            .collect();
5774                        Err(PlanError::DependentObjectsStillExist {
5775                            object_type: item.item_type().to_string(),
5776                            object_name: scx
5777                                .catalog
5778                                .resolve_full_name(item.name())
5779                                .to_string()
5780                                .to_string(),
5781                            dependents: names,
5782                        })
5783                    } else {
5784                        Ok(())
5785                    }
5786                };
5787
5788                // When this item gets dropped it will also drop its progress source, so we need to
5789                // check the users of those.
5790                if let Some(id) = item.progress_id() {
5791                    let progress_item = scx.catalog.get_item(&id);
5792                    check_if_dependents_exist(progress_item.used_by())?;
5793                }
5794                check_if_dependents_exist(item.used_by())?;
5795            }
5796            drop_ids.push(item.id().into());
5797        }
5798        update_privilege_revokes(
5799            SystemObjectId::Object(item.id().into()),
5800            item.privileges(),
5801            &role_ids,
5802            &mut privilege_revokes,
5803        );
5804    }
5805
5806    // Schemas
5807    for schema in scx.catalog.get_schemas() {
5808        if !schema.id().is_temporary() {
5809            if role_ids.contains(&schema.owner_id()) {
5810                if !cascade {
5811                    let non_owned_dependencies: Vec<_> = schema
5812                        .item_ids()
5813                        .map(|item_id| scx.catalog.get_item(&item_id))
5814                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5815                        .filter(|item| !role_ids.contains(&item.owner_id()))
5816                        .collect();
5817                    if !non_owned_dependencies.is_empty() {
5818                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5819                        sql_bail!(
5820                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5821                            full_schema_name.to_string().quoted()
5822                        );
5823                    }
5824                }
5825                drop_ids.push((*schema.database(), *schema.id()).into())
5826            }
5827            update_privilege_revokes(
5828                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5829                schema.privileges(),
5830                &role_ids,
5831                &mut privilege_revokes,
5832            );
5833        }
5834    }
5835
5836    // Databases
5837    for database in scx.catalog.get_databases() {
5838        if role_ids.contains(&database.owner_id()) {
5839            if !cascade {
5840                let non_owned_schemas: Vec<_> = database
5841                    .schemas()
5842                    .into_iter()
5843                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5844                    .collect();
5845                if !non_owned_schemas.is_empty() {
5846                    sql_bail!(
5847                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5848                        database.name().quoted(),
5849                    );
5850                }
5851            }
5852            drop_ids.push(database.id().into());
5853        }
5854        update_privilege_revokes(
5855            SystemObjectId::Object(database.id().into()),
5856            database.privileges(),
5857            &role_ids,
5858            &mut privilege_revokes,
5859        );
5860    }
5861
5862    // System
5863    update_privilege_revokes(
5864        SystemObjectId::System,
5865        scx.catalog.get_system_privileges(),
5866        &role_ids,
5867        &mut privilege_revokes,
5868    );
5869
5870    for (default_privilege_object, default_privilege_acl_items) in
5871        scx.catalog.get_default_privileges()
5872    {
5873        for default_privilege_acl_item in default_privilege_acl_items {
5874            if role_ids.contains(&default_privilege_object.role_id)
5875                || role_ids.contains(&default_privilege_acl_item.grantee)
5876            {
5877                default_privilege_revokes.push((
5878                    default_privilege_object.clone(),
5879                    default_privilege_acl_item.clone(),
5880                ));
5881            }
5882        }
5883    }
5884
5885    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5886
5887    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5888    if !system_ids.is_empty() {
5889        let mut owners = system_ids
5890            .into_iter()
5891            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5892            .collect::<BTreeSet<_>>()
5893            .into_iter()
5894            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5895        sql_bail!(
5896            "cannot drop objects owned by role {} because they are required by the database system",
5897            owners.join(", "),
5898        );
5899    }
5900
5901    Ok(Plan::DropOwned(DropOwnedPlan {
5902        role_ids: role_ids.into_iter().collect(),
5903        drop_ids,
5904        privilege_revokes,
5905        default_privilege_revokes,
5906    }))
5907}
5908
5909fn plan_retain_history_option(
5910    scx: &StatementContext,
5911    retain_history: Option<OptionalDuration>,
5912) -> Result<Option<CompactionWindow>, PlanError> {
5913    if let Some(OptionalDuration(lcw)) = retain_history {
5914        Ok(Some(plan_retain_history(scx, lcw)?))
5915    } else {
5916        Ok(None)
5917    }
5918}
5919
5920// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5921// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5922// already converts the zero duration into `None`. This function must not be called in the `RESET
5923// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5924// `None`.
5925fn plan_retain_history(
5926    scx: &StatementContext,
5927    lcw: Option<Duration>,
5928) -> Result<CompactionWindow, PlanError> {
5929    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5930    match lcw {
5931        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5932        // disable compaction), and should never occur here. Furthermore, some things actually do
5933        // break when this is set to real zero:
5934        // https://github.com/MaterializeInc/database-issues/issues/3798.
5935        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5936            option_name: "RETAIN HISTORY".to_string(),
5937            err: Box::new(PlanError::Unstructured(
5938                "internal error: unexpectedly zero".to_string(),
5939            )),
5940        }),
5941        Some(duration) => {
5942            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5943            // should only be possible during testing).
5944            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5945                && scx
5946                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5947                    .is_err()
5948            {
5949                return Err(PlanError::RetainHistoryLow {
5950                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5951                });
5952            }
5953            Ok(duration.try_into()?)
5954        }
5955        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5956        // to be a bad choice, so prevent it.
5957        None => {
5958            if scx
5959                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5960                .is_err()
5961            {
5962                Err(PlanError::RetainHistoryRequired)
5963            } else {
5964                Ok(CompactionWindow::DisableCompaction)
5965            }
5966        }
5967    }
5968}
5969
5970generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5971
5972fn plan_index_options(
5973    scx: &StatementContext,
5974    with_opts: Vec<IndexOption<Aug>>,
5975) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5976    if !with_opts.is_empty() {
5977        // Index options are not durable.
5978        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5979    }
5980
5981    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5982
5983    let mut out = Vec::with_capacity(1);
5984    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5985        out.push(crate::plan::IndexOption::RetainHistory(cw));
5986    }
5987    Ok(out)
5988}
5989
5990generate_extracted_config!(
5991    TableOption,
5992    (PartitionBy, Vec<Ident>),
5993    (RetainHistory, OptionalDuration),
5994    (RedactedTest, String)
5995);
5996
5997fn plan_table_options(
5998    scx: &StatementContext,
5999    desc: &RelationDesc,
6000    with_opts: Vec<TableOption<Aug>>,
6001) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6002    let TableOptionExtracted {
6003        partition_by,
6004        retain_history,
6005        redacted_test,
6006        ..
6007    }: TableOptionExtracted = with_opts.try_into()?;
6008
6009    if let Some(partition_by) = partition_by {
6010        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6011        check_partition_by(desc, partition_by)?;
6012    }
6013
6014    if redacted_test.is_some() {
6015        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6016    }
6017
6018    let mut out = Vec::with_capacity(1);
6019    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6020        out.push(crate::plan::TableOption::RetainHistory(cw));
6021    }
6022    Ok(out)
6023}
6024
6025pub fn plan_alter_index_options(
6026    scx: &mut StatementContext,
6027    AlterIndexStatement {
6028        index_name,
6029        if_exists,
6030        action,
6031    }: AlterIndexStatement<Aug>,
6032) -> Result<Plan, PlanError> {
6033    let object_type = ObjectType::Index;
6034    match action {
6035        AlterIndexAction::ResetOptions(options) => {
6036            let mut options = options.into_iter();
6037            if let Some(opt) = options.next() {
6038                match opt {
6039                    IndexOptionName::RetainHistory => {
6040                        if options.next().is_some() {
6041                            sql_bail!("RETAIN HISTORY must be only option");
6042                        }
6043                        return alter_retain_history(
6044                            scx,
6045                            object_type,
6046                            if_exists,
6047                            UnresolvedObjectName::Item(index_name),
6048                            None,
6049                        );
6050                    }
6051                }
6052            }
6053            sql_bail!("expected option");
6054        }
6055        AlterIndexAction::SetOptions(options) => {
6056            let mut options = options.into_iter();
6057            if let Some(opt) = options.next() {
6058                match opt.name {
6059                    IndexOptionName::RetainHistory => {
6060                        if options.next().is_some() {
6061                            sql_bail!("RETAIN HISTORY must be only option");
6062                        }
6063                        return alter_retain_history(
6064                            scx,
6065                            object_type,
6066                            if_exists,
6067                            UnresolvedObjectName::Item(index_name),
6068                            opt.value,
6069                        );
6070                    }
6071                }
6072            }
6073            sql_bail!("expected option");
6074        }
6075    }
6076}
6077
6078pub fn describe_alter_cluster_set_options(
6079    _: &StatementContext,
6080    _: AlterClusterStatement<Aug>,
6081) -> Result<StatementDesc, PlanError> {
6082    Ok(StatementDesc::new(None))
6083}
6084
6085pub fn plan_alter_cluster(
6086    scx: &mut StatementContext,
6087    AlterClusterStatement {
6088        name,
6089        action,
6090        if_exists,
6091    }: AlterClusterStatement<Aug>,
6092) -> Result<Plan, PlanError> {
6093    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6094        Some(entry) => entry,
6095        None => {
6096            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6097                name: name.to_ast_string_simple(),
6098                object_type: ObjectType::Cluster,
6099            });
6100
6101            return Ok(Plan::AlterNoop(AlterNoopPlan {
6102                object_type: ObjectType::Cluster,
6103            }));
6104        }
6105    };
6106
6107    let mut options: PlanClusterOption = Default::default();
6108    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6109
6110    match action {
6111        AlterClusterAction::SetOptions {
6112            options: set_options,
6113            with_options,
6114        } => {
6115            let ClusterOptionExtracted {
6116                availability_zones,
6117                introspection_debugging,
6118                introspection_interval,
6119                managed,
6120                replicas: replica_defs,
6121                replication_factor,
6122                seen: _,
6123                size,
6124                disk,
6125                schedule,
6126                workload_class,
6127            }: ClusterOptionExtracted = set_options.try_into()?;
6128
6129            if !scx.catalog.active_role_id().is_system() {
6130                if workload_class.is_some() {
6131                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6132                }
6133            }
6134
6135            match managed.unwrap_or_else(|| cluster.is_managed()) {
6136                true => {
6137                    let alter_strategy_extracted =
6138                        ClusterAlterOptionExtracted::try_from(with_options)?;
6139                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6140
6141                    match alter_strategy {
6142                        AlterClusterPlanStrategy::None => {}
6143                        _ => {
6144                            scx.require_feature_flag(
6145                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6146                            )?;
6147                        }
6148                    }
6149
6150                    if replica_defs.is_some() {
6151                        sql_bail!("REPLICAS not supported for managed clusters");
6152                    }
6153                    if schedule.is_some()
6154                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6155                    {
6156                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6157                    }
6158
6159                    if let Some(replication_factor) = replication_factor {
6160                        if schedule.is_some()
6161                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6162                        {
6163                            sql_bail!(
6164                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6165                            );
6166                        }
6167                        if let Some(current_schedule) = cluster.schedule() {
6168                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6169                                sql_bail!(
6170                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6171                                );
6172                            }
6173                        }
6174
6175                        let internal_replica_count =
6176                            cluster.replicas().iter().filter(|r| r.internal()).count();
6177                        let hypothetical_replica_count =
6178                            internal_replica_count + usize::cast_from(replication_factor);
6179
6180                        // Total number of replicas running is internal replicas
6181                        // + replication factor.
6182                        if contains_single_replica_objects(scx, cluster)
6183                            && hypothetical_replica_count > 1
6184                        {
6185                            return Err(PlanError::CreateReplicaFailStorageObjects {
6186                                current_replica_count: cluster.replica_ids().iter().count(),
6187                                internal_replica_count,
6188                                hypothetical_replica_count,
6189                            });
6190                        }
6191                    } else if alter_strategy.is_some() {
6192                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6193                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6194                        // just fail.
6195                        let internal_replica_count =
6196                            cluster.replicas().iter().filter(|r| r.internal()).count();
6197                        let hypothetical_replica_count = internal_replica_count * 2;
6198                        if contains_single_replica_objects(scx, cluster) {
6199                            return Err(PlanError::CreateReplicaFailStorageObjects {
6200                                current_replica_count: cluster.replica_ids().iter().count(),
6201                                internal_replica_count,
6202                                hypothetical_replica_count,
6203                            });
6204                        }
6205                    }
6206                }
6207                false => {
6208                    if !alter_strategy.is_none() {
6209                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6210                    }
6211                    if availability_zones.is_some() {
6212                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6213                    }
6214                    if replication_factor.is_some() {
6215                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6216                    }
6217                    if introspection_debugging.is_some() {
6218                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6219                    }
6220                    if introspection_interval.is_some() {
6221                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6222                    }
6223                    if size.is_some() {
6224                        sql_bail!("SIZE not supported for unmanaged clusters");
6225                    }
6226                    if disk.is_some() {
6227                        sql_bail!("DISK not supported for unmanaged clusters");
6228                    }
6229                    if schedule.is_some()
6230                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6231                    {
6232                        sql_bail!(
6233                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6234                        );
6235                    }
6236                    if let Some(current_schedule) = cluster.schedule() {
6237                        if !matches!(current_schedule, ClusterSchedule::Manual)
6238                            && schedule.is_none()
6239                        {
6240                            sql_bail!(
6241                                "when switching a cluster to unmanaged, if the managed \
6242                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6243                                explicitly set the SCHEDULE to MANUAL"
6244                            );
6245                        }
6246                    }
6247                }
6248            }
6249
6250            let mut replicas = vec![];
6251            for ReplicaDefinition { name, options } in
6252                replica_defs.into_iter().flat_map(Vec::into_iter)
6253            {
6254                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6255            }
6256
6257            if let Some(managed) = managed {
6258                options.managed = AlterOptionParameter::Set(managed);
6259            }
6260            if let Some(replication_factor) = replication_factor {
6261                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6262            }
6263            if let Some(size) = &size {
6264                options.size = AlterOptionParameter::Set(size.clone());
6265            }
6266            if let Some(availability_zones) = availability_zones {
6267                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6268            }
6269            if let Some(introspection_debugging) = introspection_debugging {
6270                options.introspection_debugging =
6271                    AlterOptionParameter::Set(introspection_debugging);
6272            }
6273            if let Some(introspection_interval) = introspection_interval {
6274                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6275            }
6276            if disk.is_some() {
6277                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6278                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6279                // we'll be able to remove the `DISK` option entirely.
6280                let size = size.as_deref().unwrap_or_else(|| {
6281                    cluster.managed_size().expect("cluster known to be managed")
6282                });
6283                if scx.catalog.is_cluster_size_cc(size) {
6284                    sql_bail!(
6285                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6286                    );
6287                }
6288
6289                scx.catalog
6290                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6291            }
6292            if !replicas.is_empty() {
6293                options.replicas = AlterOptionParameter::Set(replicas);
6294            }
6295            if let Some(schedule) = schedule {
6296                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6297            }
6298            if let Some(workload_class) = workload_class {
6299                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6300            }
6301        }
6302        AlterClusterAction::ResetOptions(reset_options) => {
6303            use AlterOptionParameter::Reset;
6304            use ClusterOptionName::*;
6305
6306            if !scx.catalog.active_role_id().is_system() {
6307                if reset_options.contains(&WorkloadClass) {
6308                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6309                }
6310            }
6311
6312            for option in reset_options {
6313                match option {
6314                    AvailabilityZones => options.availability_zones = Reset,
6315                    Disk => scx
6316                        .catalog
6317                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6318                    IntrospectionInterval => options.introspection_interval = Reset,
6319                    IntrospectionDebugging => options.introspection_debugging = Reset,
6320                    Managed => options.managed = Reset,
6321                    Replicas => options.replicas = Reset,
6322                    ReplicationFactor => options.replication_factor = Reset,
6323                    Size => options.size = Reset,
6324                    Schedule => options.schedule = Reset,
6325                    WorkloadClass => options.workload_class = Reset,
6326                }
6327            }
6328        }
6329    }
6330    Ok(Plan::AlterCluster(AlterClusterPlan {
6331        id: cluster.id(),
6332        name: cluster.name().to_string(),
6333        options,
6334        strategy: alter_strategy,
6335    }))
6336}
6337
6338pub fn describe_alter_set_cluster(
6339    _: &StatementContext,
6340    _: AlterSetClusterStatement<Aug>,
6341) -> Result<StatementDesc, PlanError> {
6342    Ok(StatementDesc::new(None))
6343}
6344
6345pub fn plan_alter_item_set_cluster(
6346    scx: &StatementContext,
6347    AlterSetClusterStatement {
6348        if_exists,
6349        set_cluster: in_cluster_name,
6350        name,
6351        object_type,
6352    }: AlterSetClusterStatement<Aug>,
6353) -> Result<Plan, PlanError> {
6354    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6355
6356    let object_type = object_type.into();
6357
6358    // Prevent access to `SET CLUSTER` for unsupported objects.
6359    match object_type {
6360        ObjectType::MaterializedView => {}
6361        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6362            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6363        }
6364        _ => {
6365            bail_never_supported!(
6366                format!("ALTER {object_type} SET CLUSTER"),
6367                "sql/alter-set-cluster/",
6368                format!("{object_type} has no associated cluster")
6369            )
6370        }
6371    }
6372
6373    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6374
6375    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6376        Some(entry) => {
6377            let current_cluster = entry.cluster_id();
6378            let Some(current_cluster) = current_cluster else {
6379                sql_bail!("No cluster associated with {name}");
6380            };
6381
6382            if current_cluster == in_cluster.id() {
6383                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6384            } else {
6385                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6386                    id: entry.id(),
6387                    set_cluster: in_cluster.id(),
6388                }))
6389            }
6390        }
6391        None => {
6392            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6393                name: name.to_ast_string_simple(),
6394                object_type,
6395            });
6396
6397            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6398        }
6399    }
6400}
6401
6402pub fn describe_alter_object_rename(
6403    _: &StatementContext,
6404    _: AlterObjectRenameStatement,
6405) -> Result<StatementDesc, PlanError> {
6406    Ok(StatementDesc::new(None))
6407}
6408
6409pub fn plan_alter_object_rename(
6410    scx: &mut StatementContext,
6411    AlterObjectRenameStatement {
6412        name,
6413        object_type,
6414        to_item_name,
6415        if_exists,
6416    }: AlterObjectRenameStatement,
6417) -> Result<Plan, PlanError> {
6418    let object_type = object_type.into();
6419    match (object_type, name) {
6420        (
6421            ObjectType::View
6422            | ObjectType::MaterializedView
6423            | ObjectType::Table
6424            | ObjectType::Source
6425            | ObjectType::Index
6426            | ObjectType::Sink
6427            | ObjectType::Secret
6428            | ObjectType::Connection,
6429            UnresolvedObjectName::Item(name),
6430        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6431        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6432            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6433        }
6434        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6435            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6436        }
6437        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6438            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6439        }
6440        (object_type, name) => {
6441            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6442        }
6443    }
6444}
6445
6446pub fn plan_alter_schema_rename(
6447    scx: &mut StatementContext,
6448    name: UnresolvedSchemaName,
6449    to_schema_name: Ident,
6450    if_exists: bool,
6451) -> Result<Plan, PlanError> {
6452    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6453        let object_type = ObjectType::Schema;
6454        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6455            name: name.to_ast_string_simple(),
6456            object_type,
6457        });
6458        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6459    };
6460
6461    // Make sure the name is unique.
6462    if scx
6463        .resolve_schema_in_database(&db_spec, &to_schema_name)
6464        .is_ok()
6465    {
6466        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6467            to_schema_name.clone().into_string(),
6468        )));
6469    }
6470
6471    // Prevent users from renaming system related schemas.
6472    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6473    if schema.id().is_system() {
6474        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6475    }
6476
6477    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6478        cur_schema_spec: (db_spec, schema_spec),
6479        new_schema_name: to_schema_name.into_string(),
6480    }))
6481}
6482
6483pub fn plan_alter_schema_swap<F>(
6484    scx: &mut StatementContext,
6485    name_a: UnresolvedSchemaName,
6486    name_b: Ident,
6487    gen_temp_suffix: F,
6488) -> Result<Plan, PlanError>
6489where
6490    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6491{
6492    let schema_a = scx.resolve_schema(name_a.clone())?;
6493
6494    let db_spec = schema_a.database().clone();
6495    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6496        sql_bail!("cannot swap schemas that are in the ambient database");
6497    };
6498    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6499
6500    // We cannot swap system schemas.
6501    if schema_a.id().is_system() || schema_b.id().is_system() {
6502        bail_never_supported!("swapping a system schema".to_string())
6503    }
6504
6505    // Generate a temporary name we can swap schema_a to.
6506    //
6507    // 'check' returns if the temp schema name would be valid.
6508    let check = |temp_suffix: &str| {
6509        let mut temp_name = ident!("mz_schema_swap_");
6510        temp_name.append_lossy(temp_suffix);
6511        scx.resolve_schema_in_database(&db_spec, &temp_name)
6512            .is_err()
6513    };
6514    let temp_suffix = gen_temp_suffix(&check)?;
6515    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6516
6517    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6518        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6519        schema_a_name: schema_a.name().schema.to_string(),
6520        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6521        schema_b_name: schema_b.name().schema.to_string(),
6522        name_temp,
6523    }))
6524}
6525
6526pub fn plan_alter_item_rename(
6527    scx: &mut StatementContext,
6528    object_type: ObjectType,
6529    name: UnresolvedItemName,
6530    to_item_name: Ident,
6531    if_exists: bool,
6532) -> Result<Plan, PlanError> {
6533    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6534        Ok(r) => r,
6535        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6536        Err(PlanError::MismatchedObjectType {
6537            name,
6538            is_type: ObjectType::MaterializedView,
6539            expected_type: ObjectType::View,
6540        }) => {
6541            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6542        }
6543        e => e?,
6544    };
6545
6546    match resolved {
6547        Some(entry) => {
6548            let full_name = scx.catalog.resolve_full_name(entry.name());
6549            let item_type = entry.item_type();
6550
6551            let proposed_name = QualifiedItemName {
6552                qualifiers: entry.name().qualifiers.clone(),
6553                item: to_item_name.clone().into_string(),
6554            };
6555
6556            // For PostgreSQL compatibility, items and types cannot have
6557            // overlapping names in a variety of situations. See the comment on
6558            // `CatalogItemType::conflicts_with_type` for details.
6559            let conflicting_type_exists;
6560            let conflicting_item_exists;
6561            if item_type == CatalogItemType::Type {
6562                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6563                conflicting_item_exists = scx
6564                    .catalog
6565                    .get_item_by_name(&proposed_name)
6566                    .map(|item| item.item_type().conflicts_with_type())
6567                    .unwrap_or(false);
6568            } else {
6569                conflicting_type_exists = item_type.conflicts_with_type()
6570                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6571                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6572            };
6573            if conflicting_type_exists || conflicting_item_exists {
6574                sql_bail!("catalog item '{}' already exists", to_item_name);
6575            }
6576
6577            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6578                id: entry.id(),
6579                current_full_name: full_name,
6580                to_name: normalize::ident(to_item_name),
6581                object_type,
6582            }))
6583        }
6584        None => {
6585            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6586                name: name.to_ast_string_simple(),
6587                object_type,
6588            });
6589
6590            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6591        }
6592    }
6593}
6594
6595pub fn plan_alter_cluster_rename(
6596    scx: &mut StatementContext,
6597    object_type: ObjectType,
6598    name: Ident,
6599    to_name: Ident,
6600    if_exists: bool,
6601) -> Result<Plan, PlanError> {
6602    match resolve_cluster(scx, &name, if_exists)? {
6603        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6604            id: entry.id(),
6605            name: entry.name().to_string(),
6606            to_name: ident(to_name),
6607        })),
6608        None => {
6609            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6610                name: name.to_ast_string_simple(),
6611                object_type,
6612            });
6613
6614            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6615        }
6616    }
6617}
6618
6619pub fn plan_alter_cluster_swap<F>(
6620    scx: &mut StatementContext,
6621    name_a: Ident,
6622    name_b: Ident,
6623    gen_temp_suffix: F,
6624) -> Result<Plan, PlanError>
6625where
6626    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6627{
6628    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6629    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6630
6631    let check = |temp_suffix: &str| {
6632        let mut temp_name = ident!("mz_schema_swap_");
6633        temp_name.append_lossy(temp_suffix);
6634        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6635            // Temp name does not exist, so we can use it.
6636            Err(CatalogError::UnknownCluster(_)) => true,
6637            // Temp name already exists!
6638            Ok(_) | Err(_) => false,
6639        }
6640    };
6641    let temp_suffix = gen_temp_suffix(&check)?;
6642    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6643
6644    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6645        id_a: cluster_a.id(),
6646        id_b: cluster_b.id(),
6647        name_a: name_a.into_string(),
6648        name_b: name_b.into_string(),
6649        name_temp,
6650    }))
6651}
6652
6653pub fn plan_alter_cluster_replica_rename(
6654    scx: &mut StatementContext,
6655    object_type: ObjectType,
6656    name: QualifiedReplica,
6657    to_item_name: Ident,
6658    if_exists: bool,
6659) -> Result<Plan, PlanError> {
6660    match resolve_cluster_replica(scx, &name, if_exists)? {
6661        Some((cluster, replica)) => {
6662            ensure_cluster_is_not_managed(scx, cluster.id())?;
6663            Ok(Plan::AlterClusterReplicaRename(
6664                AlterClusterReplicaRenamePlan {
6665                    cluster_id: cluster.id(),
6666                    replica_id: replica,
6667                    name: QualifiedReplica {
6668                        cluster: Ident::new(cluster.name())?,
6669                        replica: name.replica,
6670                    },
6671                    to_name: normalize::ident(to_item_name),
6672                },
6673            ))
6674        }
6675        None => {
6676            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6677                name: name.to_ast_string_simple(),
6678                object_type,
6679            });
6680
6681            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6682        }
6683    }
6684}
6685
6686pub fn describe_alter_object_swap(
6687    _: &StatementContext,
6688    _: AlterObjectSwapStatement,
6689) -> Result<StatementDesc, PlanError> {
6690    Ok(StatementDesc::new(None))
6691}
6692
6693pub fn plan_alter_object_swap(
6694    scx: &mut StatementContext,
6695    stmt: AlterObjectSwapStatement,
6696) -> Result<Plan, PlanError> {
6697    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6698
6699    let AlterObjectSwapStatement {
6700        object_type,
6701        name_a,
6702        name_b,
6703    } = stmt;
6704    let object_type = object_type.into();
6705
6706    // We'll try 10 times to generate a temporary suffix.
6707    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6708        let mut attempts = 0;
6709        let name_temp = loop {
6710            attempts += 1;
6711            if attempts > 10 {
6712                tracing::warn!("Unable to generate temp id for swapping");
6713                sql_bail!("unable to swap!");
6714            }
6715
6716            // Call the provided closure to make sure this name is unique!
6717            let short_id = mz_ore::id_gen::temp_id();
6718            if check_fn(&short_id) {
6719                break short_id;
6720            }
6721        };
6722
6723        Ok(name_temp)
6724    };
6725
6726    match (object_type, name_a, name_b) {
6727        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6728            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6729        }
6730        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6731            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6732        }
6733        (object_type, _, _) => Err(PlanError::Unsupported {
6734            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6735            discussion_no: None,
6736        }),
6737    }
6738}
6739
6740pub fn describe_alter_retain_history(
6741    _: &StatementContext,
6742    _: AlterRetainHistoryStatement<Aug>,
6743) -> Result<StatementDesc, PlanError> {
6744    Ok(StatementDesc::new(None))
6745}
6746
6747pub fn plan_alter_retain_history(
6748    scx: &StatementContext,
6749    AlterRetainHistoryStatement {
6750        object_type,
6751        if_exists,
6752        name,
6753        history,
6754    }: AlterRetainHistoryStatement<Aug>,
6755) -> Result<Plan, PlanError> {
6756    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6757}
6758
6759fn alter_retain_history(
6760    scx: &StatementContext,
6761    object_type: ObjectType,
6762    if_exists: bool,
6763    name: UnresolvedObjectName,
6764    history: Option<WithOptionValue<Aug>>,
6765) -> Result<Plan, PlanError> {
6766    let name = match (object_type, name) {
6767        (
6768            // View gets a special error below.
6769            ObjectType::View
6770            | ObjectType::MaterializedView
6771            | ObjectType::Table
6772            | ObjectType::Source
6773            | ObjectType::Index,
6774            UnresolvedObjectName::Item(name),
6775        ) => name,
6776        (object_type, _) => {
6777            sql_bail!("{object_type} does not support RETAIN HISTORY")
6778        }
6779    };
6780    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6781        Some(entry) => {
6782            let full_name = scx.catalog.resolve_full_name(entry.name());
6783            let item_type = entry.item_type();
6784
6785            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6786            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6787                return Err(PlanError::AlterViewOnMaterializedView(
6788                    full_name.to_string(),
6789                ));
6790            } else if object_type == ObjectType::View {
6791                sql_bail!("{object_type} does not support RETAIN HISTORY")
6792            } else if object_type != item_type {
6793                sql_bail!(
6794                    "\"{}\" is a {} not a {}",
6795                    full_name,
6796                    entry.item_type(),
6797                    format!("{object_type}").to_lowercase()
6798                )
6799            }
6800
6801            // Save the original value so we can write it back down in the create_sql catalog item.
6802            let (value, lcw) = match &history {
6803                Some(WithOptionValue::RetainHistoryFor(value)) => {
6804                    let window = OptionalDuration::try_from_value(value.clone())?;
6805                    (Some(value.clone()), window.0)
6806                }
6807                // None is RESET, so use the default CW.
6808                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6809                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6810            };
6811            let window = plan_retain_history(scx, lcw)?;
6812
6813            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6814                id: entry.id(),
6815                value,
6816                window,
6817                object_type,
6818            }))
6819        }
6820        None => {
6821            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6822                name: name.to_ast_string_simple(),
6823                object_type,
6824            });
6825
6826            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6827        }
6828    }
6829}
6830
6831pub fn describe_alter_secret_options(
6832    _: &StatementContext,
6833    _: AlterSecretStatement<Aug>,
6834) -> Result<StatementDesc, PlanError> {
6835    Ok(StatementDesc::new(None))
6836}
6837
6838pub fn plan_alter_secret(
6839    scx: &mut StatementContext,
6840    stmt: AlterSecretStatement<Aug>,
6841) -> Result<Plan, PlanError> {
6842    let AlterSecretStatement {
6843        name,
6844        if_exists,
6845        value,
6846    } = stmt;
6847    let object_type = ObjectType::Secret;
6848    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6849        Some(entry) => entry.id(),
6850        None => {
6851            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6852                name: name.to_string(),
6853                object_type,
6854            });
6855
6856            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6857        }
6858    };
6859
6860    let secret_as = query::plan_secret_as(scx, value)?;
6861
6862    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6863}
6864
6865pub fn describe_alter_connection(
6866    _: &StatementContext,
6867    _: AlterConnectionStatement<Aug>,
6868) -> Result<StatementDesc, PlanError> {
6869    Ok(StatementDesc::new(None))
6870}
6871
6872generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6873
6874pub fn plan_alter_connection(
6875    scx: &StatementContext,
6876    stmt: AlterConnectionStatement<Aug>,
6877) -> Result<Plan, PlanError> {
6878    let AlterConnectionStatement {
6879        name,
6880        if_exists,
6881        actions,
6882        with_options,
6883    } = stmt;
6884    let conn_name = normalize::unresolved_item_name(name)?;
6885    let entry = match scx.catalog.resolve_item(&conn_name) {
6886        Ok(entry) => entry,
6887        Err(_) if if_exists => {
6888            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6889                name: conn_name.to_string(),
6890                object_type: ObjectType::Sink,
6891            });
6892
6893            return Ok(Plan::AlterNoop(AlterNoopPlan {
6894                object_type: ObjectType::Connection,
6895            }));
6896        }
6897        Err(e) => return Err(e.into()),
6898    };
6899
6900    let connection = entry.connection()?;
6901
6902    if actions
6903        .iter()
6904        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6905    {
6906        if actions.len() > 1 {
6907            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6908        }
6909
6910        if !with_options.is_empty() {
6911            sql_bail!(
6912                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6913                with_options
6914                    .iter()
6915                    .map(|o| o.to_ast_string_simple())
6916                    .join(", ")
6917            );
6918        }
6919
6920        if !matches!(connection, Connection::Ssh(_)) {
6921            sql_bail!(
6922                "{} is not an SSH connection",
6923                scx.catalog.resolve_full_name(entry.name())
6924            )
6925        }
6926
6927        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6928            id: entry.id(),
6929            action: crate::plan::AlterConnectionAction::RotateKeys,
6930        }));
6931    }
6932
6933    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6934    if options.validate.is_some() {
6935        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6936    }
6937
6938    let validate = match options.validate {
6939        Some(val) => val,
6940        None => {
6941            scx.catalog
6942                .system_vars()
6943                .enable_default_connection_validation()
6944                && connection.validate_by_default()
6945        }
6946    };
6947
6948    let connection_type = match connection {
6949        Connection::Aws(_) => CreateConnectionType::Aws,
6950        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6951        Connection::Kafka(_) => CreateConnectionType::Kafka,
6952        Connection::Csr(_) => CreateConnectionType::Csr,
6953        Connection::Postgres(_) => CreateConnectionType::Postgres,
6954        Connection::Ssh(_) => CreateConnectionType::Ssh,
6955        Connection::MySql(_) => CreateConnectionType::MySql,
6956        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6957        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6958    };
6959
6960    // Collect all options irrespective of action taken on them.
6961    let specified_options: BTreeSet<_> = actions
6962        .iter()
6963        .map(|action: &AlterConnectionAction<Aug>| match action {
6964            AlterConnectionAction::SetOption(option) => option.name.clone(),
6965            AlterConnectionAction::DropOption(name) => name.clone(),
6966            AlterConnectionAction::RotateKeys => unreachable!(),
6967        })
6968        .collect();
6969
6970    for invalid in INALTERABLE_OPTIONS {
6971        if specified_options.contains(invalid) {
6972            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6973        }
6974    }
6975
6976    connection::validate_options_per_connection_type(connection_type, specified_options)?;
6977
6978    // Partition operations into set and drop
6979    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6980        actions.into_iter().partition_map(|action| match action {
6981            AlterConnectionAction::SetOption(option) => Either::Left(option),
6982            AlterConnectionAction::DropOption(name) => Either::Right(name),
6983            AlterConnectionAction::RotateKeys => unreachable!(),
6984        });
6985
6986    let set_options: BTreeMap<_, _> = set_options_vec
6987        .clone()
6988        .into_iter()
6989        .map(|option| (option.name, option.value))
6990        .collect();
6991
6992    // Type check values + avoid duplicates; we don't want to e.g. let users
6993    // drop and set the same option in the same statement, so treating drops as
6994    // sets here is fine.
6995    let connection_options_extracted =
6996        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6997
6998    let duplicates: Vec<_> = connection_options_extracted
6999        .seen
7000        .intersection(&drop_options)
7001        .collect();
7002
7003    if !duplicates.is_empty() {
7004        sql_bail!(
7005            "cannot both SET and DROP/RESET options {}",
7006            duplicates
7007                .iter()
7008                .map(|option| option.to_string())
7009                .join(", ")
7010        )
7011    }
7012
7013    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7014        let set_options_count = mutually_exclusive_options
7015            .iter()
7016            .filter(|o| set_options.contains_key(o))
7017            .count();
7018        let drop_options_count = mutually_exclusive_options
7019            .iter()
7020            .filter(|o| drop_options.contains(o))
7021            .count();
7022
7023        // Disallow setting _and_ resetting mutually exclusive options
7024        if set_options_count > 0 && drop_options_count > 0 {
7025            sql_bail!(
7026                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7027                connection_type,
7028                mutually_exclusive_options
7029                    .iter()
7030                    .map(|option| option.to_string())
7031                    .join(", ")
7032            )
7033        }
7034
7035        // If any option is either set or dropped, ensure all mutually exclusive
7036        // options are dropped. We do this "behind the scenes", even though we
7037        // disallow users from performing the same action because this is the
7038        // mechanism by which we overwrite values elsewhere in the code.
7039        if set_options_count > 0 || drop_options_count > 0 {
7040            drop_options.extend(mutually_exclusive_options.iter().cloned());
7041        }
7042
7043        // n.b. if mutually exclusive options are set, those will error when we
7044        // try to replan the connection.
7045    }
7046
7047    Ok(Plan::AlterConnection(AlterConnectionPlan {
7048        id: entry.id(),
7049        action: crate::plan::AlterConnectionAction::AlterOptions {
7050            set_options,
7051            drop_options,
7052            validate,
7053        },
7054    }))
7055}
7056
7057pub fn describe_alter_sink(
7058    _: &StatementContext,
7059    _: AlterSinkStatement<Aug>,
7060) -> Result<StatementDesc, PlanError> {
7061    Ok(StatementDesc::new(None))
7062}
7063
7064pub fn plan_alter_sink(
7065    scx: &mut StatementContext,
7066    stmt: AlterSinkStatement<Aug>,
7067) -> Result<Plan, PlanError> {
7068    let AlterSinkStatement {
7069        sink_name,
7070        if_exists,
7071        action,
7072    } = stmt;
7073
7074    let object_type = ObjectType::Sink;
7075    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7076
7077    let Some(item) = item else {
7078        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7079            name: sink_name.to_string(),
7080            object_type,
7081        });
7082
7083        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7084    };
7085    // Always ALTER objects from their latest version.
7086    let item = item.at_version(RelationVersionSelector::Latest);
7087
7088    match action {
7089        AlterSinkAction::ChangeRelation(new_from) => {
7090            // First we reconstruct the original CREATE SINK statement
7091            let create_sql = item.create_sql();
7092            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7093            let [stmt]: [StatementParseResult; 1] = stmts
7094                .try_into()
7095                .expect("create sql of sink was not exactly one statement");
7096            let Statement::CreateSink(stmt) = stmt.ast else {
7097                unreachable!("invalid create SQL for sink item");
7098            };
7099
7100            // Then resolve and swap the resolved from relation to the new one
7101            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7102            stmt.from = new_from;
7103
7104            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7105            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7106                unreachable!("invalid plan for CREATE SINK statement");
7107            };
7108
7109            plan.sink.version += 1;
7110
7111            Ok(Plan::AlterSink(AlterSinkPlan {
7112                item_id: item.id(),
7113                global_id: item.global_id(),
7114                sink: plan.sink,
7115                with_snapshot: plan.with_snapshot,
7116                in_cluster: plan.in_cluster,
7117            }))
7118        }
7119        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7120        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7121    }
7122}
7123
7124pub fn describe_alter_source(
7125    _: &StatementContext,
7126    _: AlterSourceStatement<Aug>,
7127) -> Result<StatementDesc, PlanError> {
7128    // TODO: put the options here, right?
7129    Ok(StatementDesc::new(None))
7130}
7131
7132generate_extracted_config!(
7133    AlterSourceAddSubsourceOption,
7134    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7135    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7136    (Details, String)
7137);
7138
7139pub fn plan_alter_source(
7140    scx: &mut StatementContext,
7141    stmt: AlterSourceStatement<Aug>,
7142) -> Result<Plan, PlanError> {
7143    let AlterSourceStatement {
7144        source_name,
7145        if_exists,
7146        action,
7147    } = stmt;
7148    let object_type = ObjectType::Source;
7149
7150    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7151        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7152            name: source_name.to_string(),
7153            object_type,
7154        });
7155
7156        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7157    }
7158
7159    match action {
7160        AlterSourceAction::SetOptions(options) => {
7161            let mut options = options.into_iter();
7162            let option = options.next().unwrap();
7163            if option.name == CreateSourceOptionName::RetainHistory {
7164                if options.next().is_some() {
7165                    sql_bail!("RETAIN HISTORY must be only option");
7166                }
7167                return alter_retain_history(
7168                    scx,
7169                    object_type,
7170                    if_exists,
7171                    UnresolvedObjectName::Item(source_name),
7172                    option.value,
7173                );
7174            }
7175            // n.b we use this statement in purification in a way that cannot be
7176            // planned directly.
7177            sql_bail!(
7178                "Cannot modify the {} of a SOURCE.",
7179                option.name.to_ast_string_simple()
7180            );
7181        }
7182        AlterSourceAction::ResetOptions(reset) => {
7183            let mut options = reset.into_iter();
7184            let option = options.next().unwrap();
7185            if option == CreateSourceOptionName::RetainHistory {
7186                if options.next().is_some() {
7187                    sql_bail!("RETAIN HISTORY must be only option");
7188                }
7189                return alter_retain_history(
7190                    scx,
7191                    object_type,
7192                    if_exists,
7193                    UnresolvedObjectName::Item(source_name),
7194                    None,
7195                );
7196            }
7197            sql_bail!(
7198                "Cannot modify the {} of a SOURCE.",
7199                option.to_ast_string_simple()
7200            );
7201        }
7202        AlterSourceAction::DropSubsources { .. } => {
7203            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7204        }
7205        AlterSourceAction::AddSubsources { .. } => {
7206            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7207        }
7208        AlterSourceAction::RefreshReferences => {
7209            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7210        }
7211    };
7212}
7213
7214pub fn describe_alter_system_set(
7215    _: &StatementContext,
7216    _: AlterSystemSetStatement,
7217) -> Result<StatementDesc, PlanError> {
7218    Ok(StatementDesc::new(None))
7219}
7220
7221pub fn plan_alter_system_set(
7222    _: &StatementContext,
7223    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7224) -> Result<Plan, PlanError> {
7225    let name = name.to_string();
7226    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7227        name,
7228        value: scl::plan_set_variable_to(to)?,
7229    }))
7230}
7231
7232pub fn describe_alter_system_reset(
7233    _: &StatementContext,
7234    _: AlterSystemResetStatement,
7235) -> Result<StatementDesc, PlanError> {
7236    Ok(StatementDesc::new(None))
7237}
7238
7239pub fn plan_alter_system_reset(
7240    _: &StatementContext,
7241    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7242) -> Result<Plan, PlanError> {
7243    let name = name.to_string();
7244    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7245}
7246
7247pub fn describe_alter_system_reset_all(
7248    _: &StatementContext,
7249    _: AlterSystemResetAllStatement,
7250) -> Result<StatementDesc, PlanError> {
7251    Ok(StatementDesc::new(None))
7252}
7253
7254pub fn plan_alter_system_reset_all(
7255    _: &StatementContext,
7256    _: AlterSystemResetAllStatement,
7257) -> Result<Plan, PlanError> {
7258    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7259}
7260
7261pub fn describe_alter_role(
7262    _: &StatementContext,
7263    _: AlterRoleStatement<Aug>,
7264) -> Result<StatementDesc, PlanError> {
7265    Ok(StatementDesc::new(None))
7266}
7267
7268pub fn plan_alter_role(
7269    scx: &StatementContext,
7270    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7271) -> Result<Plan, PlanError> {
7272    let option = match option {
7273        AlterRoleOption::Attributes(attrs) => {
7274            let attrs = plan_role_attributes(attrs, scx)?;
7275            PlannedAlterRoleOption::Attributes(attrs)
7276        }
7277        AlterRoleOption::Variable(variable) => {
7278            let var = plan_role_variable(variable)?;
7279            PlannedAlterRoleOption::Variable(var)
7280        }
7281    };
7282
7283    Ok(Plan::AlterRole(AlterRolePlan {
7284        id: name.id,
7285        name: name.name,
7286        option,
7287    }))
7288}
7289
7290pub fn describe_alter_table_add_column(
7291    _: &StatementContext,
7292    _: AlterTableAddColumnStatement<Aug>,
7293) -> Result<StatementDesc, PlanError> {
7294    Ok(StatementDesc::new(None))
7295}
7296
7297pub fn plan_alter_table_add_column(
7298    scx: &StatementContext,
7299    stmt: AlterTableAddColumnStatement<Aug>,
7300) -> Result<Plan, PlanError> {
7301    let AlterTableAddColumnStatement {
7302        if_exists,
7303        name,
7304        if_col_not_exist,
7305        column_name,
7306        data_type,
7307    } = stmt;
7308    let object_type = ObjectType::Table;
7309
7310    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7311
7312    let (relation_id, item_name, desc) =
7313        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7314            Some(item) => {
7315                // Always add columns to the latest version of the item.
7316                let item_name = scx.catalog.resolve_full_name(item.name());
7317                let item = item.at_version(RelationVersionSelector::Latest);
7318                let desc = item.desc(&item_name)?.into_owned();
7319                (item.id(), item_name, desc)
7320            }
7321            None => {
7322                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7323                    name: name.to_ast_string_simple(),
7324                    object_type,
7325                });
7326                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7327            }
7328        };
7329
7330    let column_name = ColumnName::from(column_name.as_str());
7331    if desc.get_by_name(&column_name).is_some() {
7332        if if_col_not_exist {
7333            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7334                column_name: column_name.to_string(),
7335                object_name: item_name.item,
7336            });
7337            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7338        } else {
7339            return Err(PlanError::ColumnAlreadyExists {
7340                column_name,
7341                object_name: item_name.item,
7342            });
7343        }
7344    }
7345
7346    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7347    // TODO(alter_table): Support non-nullable columns with default values.
7348    let column_type = scalar_type.nullable(true);
7349    // "unresolve" our data type so we can later update the persisted create_sql.
7350    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7351
7352    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7353        relation_id,
7354        column_name,
7355        column_type,
7356        raw_sql_type,
7357    }))
7358}
7359
7360pub fn describe_alter_materialized_view_apply_replacement(
7361    _: &StatementContext,
7362    _: AlterMaterializedViewApplyReplacementStatement,
7363) -> Result<StatementDesc, PlanError> {
7364    Ok(StatementDesc::new(None))
7365}
7366
7367pub fn plan_alter_materialized_view_apply_replacement(
7368    scx: &StatementContext,
7369    stmt: AlterMaterializedViewApplyReplacementStatement,
7370) -> Result<Plan, PlanError> {
7371    let AlterMaterializedViewApplyReplacementStatement {
7372        if_exists,
7373        name,
7374        replacement_name,
7375    } = stmt;
7376
7377    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7378
7379    let object_type = ObjectType::MaterializedView;
7380    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7381        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7382            name: name.to_ast_string_simple(),
7383            object_type,
7384        });
7385        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7386    };
7387
7388    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7389        .expect("if_exists not set");
7390
7391    if replacement.replacement_target() != Some(mv.id()) {
7392        return Err(PlanError::InvalidReplacement {
7393            item_type: mv.item_type(),
7394            item_name: scx.catalog.minimal_qualification(mv.name()),
7395            replacement_type: replacement.item_type(),
7396            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7397        });
7398    }
7399
7400    Ok(Plan::AlterMaterializedViewApplyReplacement(
7401        AlterMaterializedViewApplyReplacementPlan {
7402            id: mv.id(),
7403            replacement_id: replacement.id(),
7404        },
7405    ))
7406}
7407
7408pub fn describe_comment(
7409    _: &StatementContext,
7410    _: CommentStatement<Aug>,
7411) -> Result<StatementDesc, PlanError> {
7412    Ok(StatementDesc::new(None))
7413}
7414
7415pub fn plan_comment(
7416    scx: &mut StatementContext,
7417    stmt: CommentStatement<Aug>,
7418) -> Result<Plan, PlanError> {
7419    const MAX_COMMENT_LENGTH: usize = 1024;
7420
7421    let CommentStatement { object, comment } = stmt;
7422
7423    // TODO(parkmycar): Make max comment length configurable.
7424    if let Some(c) = &comment {
7425        if c.len() > 1024 {
7426            return Err(PlanError::CommentTooLong {
7427                length: c.len(),
7428                max_size: MAX_COMMENT_LENGTH,
7429            });
7430        }
7431    }
7432
7433    let (object_id, column_pos) = match &object {
7434        com_ty @ CommentObjectType::Table { name }
7435        | com_ty @ CommentObjectType::View { name }
7436        | com_ty @ CommentObjectType::MaterializedView { name }
7437        | com_ty @ CommentObjectType::Index { name }
7438        | com_ty @ CommentObjectType::Func { name }
7439        | com_ty @ CommentObjectType::Connection { name }
7440        | com_ty @ CommentObjectType::Source { name }
7441        | com_ty @ CommentObjectType::Sink { name }
7442        | com_ty @ CommentObjectType::Secret { name }
7443        | com_ty @ CommentObjectType::ContinualTask { name } => {
7444            let item = scx.get_item_by_resolved_name(name)?;
7445            match (com_ty, item.item_type()) {
7446                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7447                    (CommentObjectId::Table(item.id()), None)
7448                }
7449                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7450                    (CommentObjectId::View(item.id()), None)
7451                }
7452                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7453                    (CommentObjectId::MaterializedView(item.id()), None)
7454                }
7455                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7456                    (CommentObjectId::Index(item.id()), None)
7457                }
7458                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7459                    (CommentObjectId::Func(item.id()), None)
7460                }
7461                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7462                    (CommentObjectId::Connection(item.id()), None)
7463                }
7464                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7465                    (CommentObjectId::Source(item.id()), None)
7466                }
7467                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7468                    (CommentObjectId::Sink(item.id()), None)
7469                }
7470                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7471                    (CommentObjectId::Secret(item.id()), None)
7472                }
7473                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7474                    (CommentObjectId::ContinualTask(item.id()), None)
7475                }
7476                (com_ty, cat_ty) => {
7477                    let expected_type = match com_ty {
7478                        CommentObjectType::Table { .. } => ObjectType::Table,
7479                        CommentObjectType::View { .. } => ObjectType::View,
7480                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7481                        CommentObjectType::Index { .. } => ObjectType::Index,
7482                        CommentObjectType::Func { .. } => ObjectType::Func,
7483                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7484                        CommentObjectType::Source { .. } => ObjectType::Source,
7485                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7486                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7487                        _ => unreachable!("these are the only types we match on"),
7488                    };
7489
7490                    return Err(PlanError::InvalidObjectType {
7491                        expected_type: SystemObjectType::Object(expected_type),
7492                        actual_type: SystemObjectType::Object(cat_ty.into()),
7493                        object_name: item.name().item.clone(),
7494                    });
7495                }
7496            }
7497        }
7498        CommentObjectType::Type { ty } => match ty {
7499            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7500                sql_bail!("cannot comment on anonymous list or map type");
7501            }
7502            ResolvedDataType::Named { id, modifiers, .. } => {
7503                if !modifiers.is_empty() {
7504                    sql_bail!("cannot comment on type with modifiers");
7505                }
7506                (CommentObjectId::Type(*id), None)
7507            }
7508            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7509        },
7510        CommentObjectType::Column { name } => {
7511            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7512            match item.item_type() {
7513                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7514                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7515                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7516                CatalogItemType::MaterializedView => {
7517                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7518                }
7519                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7520                r => {
7521                    return Err(PlanError::Unsupported {
7522                        feature: format!("Specifying comments on a column of {r}"),
7523                        discussion_no: None,
7524                    });
7525                }
7526            }
7527        }
7528        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7529        CommentObjectType::Database { name } => {
7530            (CommentObjectId::Database(*name.database_id()), None)
7531        }
7532        CommentObjectType::Schema { name } => (
7533            CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7534            None,
7535        ),
7536        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7537        CommentObjectType::ClusterReplica { name } => {
7538            let replica = scx.catalog.resolve_cluster_replica(name)?;
7539            (
7540                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7541                None,
7542            )
7543        }
7544        CommentObjectType::NetworkPolicy { name } => {
7545            (CommentObjectId::NetworkPolicy(name.id), None)
7546        }
7547    };
7548
7549    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7550    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7551    // it's the easiest place to raise an error.
7552    //
7553    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7554    if let Some(p) = column_pos {
7555        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7556            max_num_columns: MAX_NUM_COLUMNS,
7557            req_num_columns: p,
7558        })?;
7559    }
7560
7561    Ok(Plan::Comment(CommentPlan {
7562        object_id,
7563        sub_component: column_pos,
7564        comment,
7565    }))
7566}
7567
7568pub(crate) fn resolve_cluster<'a>(
7569    scx: &'a StatementContext,
7570    name: &'a Ident,
7571    if_exists: bool,
7572) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7573    match scx.resolve_cluster(Some(name)) {
7574        Ok(cluster) => Ok(Some(cluster)),
7575        Err(_) if if_exists => Ok(None),
7576        Err(e) => Err(e),
7577    }
7578}
7579
7580pub(crate) fn resolve_cluster_replica<'a>(
7581    scx: &'a StatementContext,
7582    name: &QualifiedReplica,
7583    if_exists: bool,
7584) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7585    match scx.resolve_cluster(Some(&name.cluster)) {
7586        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7587            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7588            None if if_exists => Ok(None),
7589            None => Err(sql_err!(
7590                "CLUSTER {} has no CLUSTER REPLICA named {}",
7591                cluster.name(),
7592                name.replica.as_str().quoted(),
7593            )),
7594        },
7595        Err(_) if if_exists => Ok(None),
7596        Err(e) => Err(e),
7597    }
7598}
7599
7600pub(crate) fn resolve_database<'a>(
7601    scx: &'a StatementContext,
7602    name: &'a UnresolvedDatabaseName,
7603    if_exists: bool,
7604) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7605    match scx.resolve_database(name) {
7606        Ok(database) => Ok(Some(database)),
7607        Err(_) if if_exists => Ok(None),
7608        Err(e) => Err(e),
7609    }
7610}
7611
7612pub(crate) fn resolve_schema<'a>(
7613    scx: &'a StatementContext,
7614    name: UnresolvedSchemaName,
7615    if_exists: bool,
7616) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7617    match scx.resolve_schema(name) {
7618        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7619        Err(_) if if_exists => Ok(None),
7620        Err(e) => Err(e),
7621    }
7622}
7623
7624pub(crate) fn resolve_network_policy<'a>(
7625    scx: &'a StatementContext,
7626    name: Ident,
7627    if_exists: bool,
7628) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7629    match scx.catalog.resolve_network_policy(&name.to_string()) {
7630        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7631            id: policy.id(),
7632            name: policy.name().to_string(),
7633        })),
7634        Err(_) if if_exists => Ok(None),
7635        Err(e) => Err(e.into()),
7636    }
7637}
7638
7639pub(crate) fn resolve_item_or_type<'a>(
7640    scx: &'a StatementContext,
7641    object_type: ObjectType,
7642    name: UnresolvedItemName,
7643    if_exists: bool,
7644) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7645    let name = normalize::unresolved_item_name(name)?;
7646    let catalog_item = match object_type {
7647        ObjectType::Type => scx.catalog.resolve_type(&name),
7648        _ => scx.catalog.resolve_item(&name),
7649    };
7650
7651    match catalog_item {
7652        Ok(item) => {
7653            let is_type = ObjectType::from(item.item_type());
7654            if object_type == is_type {
7655                Ok(Some(item))
7656            } else {
7657                Err(PlanError::MismatchedObjectType {
7658                    name: scx.catalog.minimal_qualification(item.name()),
7659                    is_type,
7660                    expected_type: object_type,
7661                })
7662            }
7663        }
7664        Err(_) if if_exists => Ok(None),
7665        Err(e) => Err(e.into()),
7666    }
7667}
7668
7669/// Returns an error if the given cluster is a managed cluster
7670fn ensure_cluster_is_not_managed(
7671    scx: &StatementContext,
7672    cluster_id: ClusterId,
7673) -> Result<(), PlanError> {
7674    let cluster = scx.catalog.get_cluster(cluster_id);
7675    if cluster.is_managed() {
7676        Err(PlanError::ManagedCluster {
7677            cluster_name: cluster.name().to_string(),
7678        })
7679    } else {
7680        Ok(())
7681    }
7682}