mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59    CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60    CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61    CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141    scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154    AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155    ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156    ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157    CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158    CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165    WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184/// Given a relation desc and a column list, checks that:
185/// - the column list is a prefix of the desc;
186/// - all the listed columns are types that have meaningful Persist-level ordering.
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188    if partition_by.len() > desc.len() {
189        tracing::error!(
190            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191            desc.len(),
192            partition_by.len()
193        );
194        partition_by.truncate(desc.len());
195    }
196
197    let desc_prefix = desc.iter().take(partition_by.len());
198    for (idx, ((desc_name, desc_type), partition_name)) in
199        desc_prefix.zip_eq(partition_by).enumerate()
200    {
201        let partition_name = normalize::column_name(partition_name);
202        if *desc_name != partition_name {
203            sql_bail!(
204                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205            );
206        }
207        if !preserves_order(&desc_type.scalar_type) {
208            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209        }
210    }
211    Ok(())
212}
213
214pub fn describe_create_database(
215    _: &StatementContext,
216    _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218    Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222    _: &StatementContext,
223    CreateDatabaseStatement {
224        name,
225        if_not_exists,
226    }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228    Ok(Plan::CreateDatabase(CreateDatabasePlan {
229        name: normalize::ident(name.0),
230        if_not_exists,
231    }))
232}
233
234pub fn describe_create_schema(
235    _: &StatementContext,
236    _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238    Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242    scx: &StatementContext,
243    CreateSchemaStatement {
244        mut name,
245        if_not_exists,
246    }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248    if name.0.len() > 2 {
249        sql_bail!("schema name {} has more than two components", name);
250    }
251    let schema_name = normalize::ident(
252        name.0
253            .pop()
254            .expect("names always have at least one component"),
255    );
256    let database_spec = match name.0.pop() {
257        None => match scx.catalog.active_database() {
258            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259            None => sql_bail!("no database specified and no active database"),
260        },
261        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263            Err(_) => sql_bail!("invalid database {}", n.as_str()),
264        },
265    };
266    Ok(Plan::CreateSchema(CreateSchemaPlan {
267        database_spec,
268        schema_name,
269        if_not_exists,
270    }))
271}
272
273pub fn describe_create_table(
274    _: &StatementContext,
275    _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277    Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281    scx: &StatementContext,
282    stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284    let CreateTableStatement {
285        name,
286        columns,
287        constraints,
288        if_not_exists,
289        temporary,
290        with_options,
291    } = &stmt;
292
293    let names: Vec<_> = columns
294        .iter()
295        .filter(|c| {
296            // This set of `names` is used to create the initial RelationDesc.
297            // Columns that have been added at later versions of the table will
298            // get added further below.
299            let is_versioned = c
300                .options
301                .iter()
302                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303            !is_versioned
304        })
305        .map(|c| normalize::column_name(c.name.clone()))
306        .collect();
307
308    if let Some(dup) = names.iter().duplicates().next() {
309        sql_bail!("column {} specified more than once", dup.quoted());
310    }
311
312    // Build initial relation type that handles declared data types
313    // and NOT NULL constraints.
314    let mut column_types = Vec::with_capacity(columns.len());
315    let mut defaults = Vec::with_capacity(columns.len());
316    let mut changes = BTreeMap::new();
317    let mut keys = Vec::new();
318
319    for (i, c) in columns.into_iter().enumerate() {
320        let aug_data_type = &c.data_type;
321        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322        let mut nullable = true;
323        let mut default = Expr::null();
324        let mut versioned = false;
325        for option in &c.options {
326            match &option.option {
327                ColumnOption::NotNull => nullable = false,
328                ColumnOption::Default(expr) => {
329                    // Ensure expression can be planned and yields the correct
330                    // type.
331                    let mut expr = expr.clone();
332                    transform_ast::transform(scx, &mut expr)?;
333                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
334                    default = expr.clone();
335                }
336                ColumnOption::Unique { is_primary } => {
337                    keys.push(vec![i]);
338                    if *is_primary {
339                        nullable = false;
340                    }
341                }
342                ColumnOption::Versioned { action, version } => {
343                    let version = RelationVersion::from(*version);
344                    versioned = true;
345
346                    let name = normalize::column_name(c.name.clone());
347                    let typ = ty.clone().nullable(nullable);
348
349                    changes.insert(version, (action.clone(), name, typ));
350                }
351                other => {
352                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353                }
354            }
355        }
356        // TODO(alter_table): This assumes all versioned columns are at the
357        // end. This will no longer be true when we support dropping columns.
358        if !versioned {
359            column_types.push(ty.nullable(nullable));
360        }
361        defaults.push(default);
362    }
363
364    let mut seen_primary = false;
365    'c: for constraint in constraints {
366        match constraint {
367            TableConstraint::Unique {
368                name: _,
369                columns,
370                is_primary,
371                nulls_not_distinct,
372            } => {
373                if seen_primary && *is_primary {
374                    sql_bail!(
375                        "multiple primary keys for table {} are not allowed",
376                        name.to_ast_string_stable()
377                    );
378                }
379                seen_primary = *is_primary || seen_primary;
380
381                let mut key = vec![];
382                for column in columns {
383                    let column = normalize::column_name(column.clone());
384                    match names.iter().position(|name| *name == column) {
385                        None => sql_bail!("unknown column in constraint: {}", column),
386                        Some(i) => {
387                            let nullable = &mut column_types[i].nullable;
388                            if *is_primary {
389                                if *nulls_not_distinct {
390                                    sql_bail!(
391                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392                                    );
393                                }
394
395                                *nullable = false;
396                            } else if !(*nulls_not_distinct || !*nullable) {
397                                // Non-primary key unique constraints are only keys if all of their
398                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
399                                break 'c;
400                            }
401
402                            key.push(i);
403                        }
404                    }
405                }
406
407                if *is_primary {
408                    keys.insert(0, key);
409                } else {
410                    keys.push(key);
411                }
412            }
413            TableConstraint::ForeignKey { .. } => {
414                // Foreign key constraints are not presently enforced. We allow
415                // them with feature flags for sqllogictest's sake.
416                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417            }
418            TableConstraint::Check { .. } => {
419                // Check constraints are not presently enforced. We allow them
420                // with feature flags for sqllogictest's sake.
421                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422            }
423        }
424    }
425
426    if !keys.is_empty() {
427        // Unique constraints are not presently enforced. We allow them with feature flags for
428        // sqllogictest's sake.
429        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430    }
431
432    let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434    let temporary = *temporary;
435    let name = if temporary {
436        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437    } else {
438        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    };
440
441    // Check for an object in the catalog with this same name
442    let full_name = scx.catalog.resolve_full_name(&name);
443    let partial_name = PartialItemName::from(full_name.clone());
444    // For PostgreSQL compatibility, we need to prevent creating tables when
445    // there is an existing object *or* type of the same name.
446    if let (false, Ok(item)) = (
447        if_not_exists,
448        scx.catalog.resolve_item_or_type(&partial_name),
449    ) {
450        return Err(PlanError::ItemAlreadyExists {
451            name: full_name.to_string(),
452            item_type: item.item_type(),
453        });
454    }
455
456    let desc = RelationDesc::new(typ, names);
457    let mut desc = VersionedRelationDesc::new(desc);
458    for (version, (_action, name, typ)) in changes.into_iter() {
459        let new_version = desc.add_column(name, typ);
460        if version != new_version {
461            return Err(PlanError::InvalidTable {
462                name: full_name.item,
463            });
464        }
465    }
466
467    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469    // Table options should only consider the original columns, since those
470    // were the only ones in scope when the table was created.
471    //
472    // TODO(alter_table): Will need to reconsider this when we support ALTERing
473    // the PARTITION BY columns.
474    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477    let compaction_window = options.iter().find_map(|o| {
478        #[allow(irrefutable_let_patterns)]
479        if let crate::plan::TableOption::RetainHistory(lcw) = o {
480            Some(lcw.clone())
481        } else {
482            None
483        }
484    });
485
486    let table = Table {
487        create_sql,
488        desc,
489        temporary,
490        compaction_window,
491        data_source: TableDataSource::TableWrites { defaults },
492    };
493    Ok(Plan::CreateTable(CreateTablePlan {
494        name,
495        table,
496        if_not_exists: *if_not_exists,
497    }))
498}
499
500pub fn describe_create_table_from_source(
501    _: &StatementContext,
502    _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504    Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508    _: &StatementContext,
509    _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511    Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515    _: &StatementContext,
516    _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518    Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522    _: &StatementContext,
523    _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525    Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529    CreateSourceOption,
530    (TimestampInterval, Duration),
531    (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535    PgConfigOption,
536    (Details, String),
537    (Publication, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543    MySqlConfigOption,
544    (Details, String),
545    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550    SqlServerConfigOption,
551    (Details, String),
552    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557    scx: &StatementContext,
558    mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560    if stmt.is_table {
561        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562    }
563
564    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
565    // we plan to normalize when we canonicalize the create statement.
566    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567    let enable_multi_replica_sources =
568        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569    if !enable_multi_replica_sources {
570        if in_cluster.replica_ids().len() > 1 {
571            sql_bail!("cannot create webhook source in cluster with more than one replica")
572        }
573    }
574    let create_sql =
575        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577    let CreateWebhookSourceStatement {
578        name,
579        if_not_exists,
580        body_format,
581        include_headers,
582        validate_using,
583        is_table,
584        // We resolved `in_cluster` above, so we want to ignore it here.
585        in_cluster: _,
586    } = stmt;
587
588    let validate_using = validate_using
589        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590        .transpose()?;
591    if let Some(WebhookValidation { expression, .. }) = &validate_using {
592        // If the validation expression doesn't reference any part of the request, then we should
593        // return an error because it's almost definitely wrong.
594        if !expression.contains_column() {
595            return Err(PlanError::WebhookValidationDoesNotUseColumns);
596        }
597        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
598        // allow calls to `now()` because some webhook providers recommend rejecting requests that
599        // are older than a certain threshold.
600        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601            return Err(PlanError::WebhookValidationNonDeterministic);
602        }
603    }
604
605    let body_format = match body_format {
606        Format::Bytes => WebhookBodyFormat::Bytes,
607        Format::Json { array } => WebhookBodyFormat::Json { array },
608        Format::Text => WebhookBodyFormat::Text,
609        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
610        ty => {
611            return Err(PlanError::Unsupported {
612                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613                discussion_no: None,
614            });
615        }
616    };
617
618    let mut column_ty = vec![
619        // Always include the body of the request as the first column.
620        SqlColumnType {
621            scalar_type: SqlScalarType::from(body_format),
622            nullable: false,
623        },
624    ];
625    let mut column_names = vec!["body".to_string()];
626
627    let mut headers = WebhookHeaders::default();
628
629    // Include a `headers` column, possibly filtered.
630    if let Some(filters) = include_headers.column {
631        column_ty.push(SqlColumnType {
632            scalar_type: SqlScalarType::Map {
633                value_type: Box::new(SqlScalarType::String),
634                custom_id: None,
635            },
636            nullable: false,
637        });
638        column_names.push("headers".to_string());
639
640        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641            filters.into_iter().partition_map(|filter| {
642                if filter.block {
643                    itertools::Either::Right(filter.header_name)
644                } else {
645                    itertools::Either::Left(filter.header_name)
646                }
647            });
648        headers.header_column = Some(WebhookHeaderFilters { allow, block });
649    }
650
651    // Map headers to specific columns.
652    for header in include_headers.mappings {
653        let scalar_type = header
654            .use_bytes
655            .then_some(SqlScalarType::Bytes)
656            .unwrap_or(SqlScalarType::String);
657        column_ty.push(SqlColumnType {
658            scalar_type,
659            nullable: true,
660        });
661        column_names.push(header.column_name.into_string());
662
663        let column_idx = column_ty.len() - 1;
664        // Double check we're consistent with column names.
665        assert_eq!(
666            column_idx,
667            column_names.len() - 1,
668            "header column names and types don't match"
669        );
670        headers
671            .mapped_headers
672            .insert(column_idx, (header.header_name, header.use_bytes));
673    }
674
675    // Validate our columns.
676    let mut unique_check = HashSet::with_capacity(column_names.len());
677    for name in &column_names {
678        if !unique_check.insert(name) {
679            return Err(PlanError::AmbiguousColumn(name.clone().into()));
680        }
681    }
682    if column_names.len() > MAX_NUM_COLUMNS {
683        return Err(PlanError::TooManyColumns {
684            max_num_columns: MAX_NUM_COLUMNS,
685            req_num_columns: column_names.len(),
686        });
687    }
688
689    let typ = SqlRelationType::new(column_ty);
690    let desc = RelationDesc::new(typ, column_names);
691
692    // Check for an object in the catalog with this same name
693    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694    let full_name = scx.catalog.resolve_full_name(&name);
695    let partial_name = PartialItemName::from(full_name.clone());
696    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697        return Err(PlanError::ItemAlreadyExists {
698            name: full_name.to_string(),
699            item_type: item.item_type(),
700        });
701    }
702
703    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
704    // such, we always use a default of EpochMilliseconds.
705    let timeline = Timeline::EpochMilliseconds;
706
707    let plan = if is_table {
708        let data_source = DataSourceDesc::Webhook {
709            validate_using,
710            body_format,
711            headers,
712            cluster_id: Some(in_cluster.id()),
713        };
714        let data_source = TableDataSource::DataSource {
715            desc: data_source,
716            timeline,
717        };
718        Plan::CreateTable(CreateTablePlan {
719            name,
720            if_not_exists,
721            table: Table {
722                create_sql,
723                desc: VersionedRelationDesc::new(desc),
724                temporary: false,
725                compaction_window: None,
726                data_source,
727            },
728        })
729    } else {
730        let data_source = DataSourceDesc::Webhook {
731            validate_using,
732            body_format,
733            headers,
734            // Important: The cluster is set at the `Source` level.
735            cluster_id: None,
736        };
737        Plan::CreateSource(CreateSourcePlan {
738            name,
739            source: Source {
740                create_sql,
741                data_source,
742                desc,
743                compaction_window: None,
744            },
745            if_not_exists,
746            timeline,
747            in_cluster: Some(in_cluster.id()),
748        })
749    };
750
751    Ok(plan)
752}
753
754pub fn plan_create_source(
755    scx: &StatementContext,
756    mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758    let CreateSourceStatement {
759        name,
760        in_cluster: _,
761        col_names,
762        connection: source_connection,
763        envelope,
764        if_not_exists,
765        format,
766        key_constraint,
767        include_metadata,
768        with_options,
769        external_references: referenced_subsources,
770        progress_subsource,
771    } = &stmt;
772
773    mz_ore::soft_assert_or_log!(
774        referenced_subsources.is_none(),
775        "referenced subsources must be cleared in purification"
776    );
777
778    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779        && scx.catalog.system_vars().force_source_table_syntax();
780
781    // If the new source table syntax is forced all the options related to the primary
782    // source output should be un-set.
783    if force_source_table_syntax {
784        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785            Err(PlanError::UseTablesForSources(
786                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787            ))?;
788        }
789    }
790
791    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794        && include_metadata
795            .iter()
796            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797    {
798        // TODO(guswynn): should this be `bail_unsupported!`?
799        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800    }
801    if !matches!(
802        source_connection,
803        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804    ) && !include_metadata.is_empty()
805    {
806        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807    }
808
809    if !include_metadata.is_empty()
810        && !matches!(
811            envelope,
812            ast::SourceEnvelope::Upsert { .. }
813                | ast::SourceEnvelope::None
814                | ast::SourceEnvelope::Debezium
815        )
816    {
817        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818    }
819
820    let external_connection =
821        plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823    let CreateSourceOptionExtracted {
824        timestamp_interval,
825        retain_history,
826        seen: _,
827    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829    let metadata_columns_desc = match external_connection {
830        GenericSourceConnection::Kafka(KafkaSourceConnection {
831            ref metadata_columns,
832            ..
833        }) => kafka_metadata_columns_desc(metadata_columns),
834        _ => vec![],
835    };
836
837    // Generate the relation description for the primary export of the source.
838    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839        scx,
840        &envelope,
841        format,
842        Some(external_connection.default_key_desc()),
843        external_connection.default_value_desc(),
844        include_metadata,
845        metadata_columns_desc,
846        &external_connection,
847    )?;
848    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850    let names: Vec<_> = desc.iter_names().cloned().collect();
851    if let Some(dup) = names.iter().duplicates().next() {
852        sql_bail!("column {} specified more than once", dup.quoted());
853    }
854
855    // Apply user-specified key constraint
856    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857        // Don't remove this without addressing
858        // https://github.com/MaterializeInc/database-issues/issues/4371.
859        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861        let key_columns = columns
862            .into_iter()
863            .map(normalize::column_name)
864            .collect::<Vec<_>>();
865
866        let mut uniq = BTreeSet::new();
867        for col in key_columns.iter() {
868            if !uniq.insert(col) {
869                sql_bail!("Repeated column name in source key constraint: {}", col);
870            }
871        }
872
873        let key_indices = key_columns
874            .iter()
875            .map(|col| {
876                let name_idx = desc
877                    .get_by_name(col)
878                    .map(|(idx, _type)| idx)
879                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880                if desc.get_unambiguous_name(name_idx).is_none() {
881                    sql_bail!("Ambiguous column in source key constraint: {}", col);
882                }
883                Ok(name_idx)
884            })
885            .collect::<Result<Vec<_>, _>>()?;
886
887        if !desc.typ().keys.is_empty() {
888            return Err(key_constraint_err(&desc, &key_columns));
889        } else {
890            desc = desc.with_key(key_indices);
891        }
892    }
893
894    let timestamp_interval = match timestamp_interval {
895        Some(duration) => {
896            let min = scx.catalog.system_vars().min_timestamp_interval();
897            let max = scx.catalog.system_vars().max_timestamp_interval();
898            if duration < min || duration > max {
899                return Err(PlanError::InvalidTimestampInterval {
900                    min,
901                    max,
902                    requested: duration,
903                });
904            }
905            duration
906        }
907        None => scx.catalog.config().timestamp_interval,
908    };
909
910    let (desc, data_source) = match progress_subsource {
911        Some(name) => {
912            let DeferredItemName::Named(name) = name else {
913                sql_bail!("[internal error] progress subsource must be named during purification");
914            };
915            let ResolvedItemName::Item { id, .. } = name else {
916                sql_bail!("[internal error] invalid target id");
917            };
918
919            let details = match external_connection {
920                GenericSourceConnection::Kafka(ref c) => {
921                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
922                        metadata_columns: c.metadata_columns.clone(),
923                    })
924                }
925                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926                    LoadGenerator::Auction
927                    | LoadGenerator::Marketing
928                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929                    LoadGenerator::Counter { .. }
930                    | LoadGenerator::Clock
931                    | LoadGenerator::Datums
932                    | LoadGenerator::KeyValue(_) => {
933                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934                            output: LoadGeneratorOutput::Default,
935                        })
936                    }
937                },
938                GenericSourceConnection::Postgres(_)
939                | GenericSourceConnection::MySql(_)
940                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941            };
942
943            let data_source = DataSourceDesc::OldSyntaxIngestion {
944                desc: SourceDesc {
945                    connection: external_connection,
946                    timestamp_interval,
947                },
948                progress_subsource: *id,
949                data_config: SourceExportDataConfig {
950                    encoding,
951                    envelope: envelope.clone(),
952                },
953                details,
954            };
955            (desc, data_source)
956        }
957        None => {
958            let desc = external_connection.timestamp_desc();
959            let data_source = DataSourceDesc::Ingestion(SourceDesc {
960                connection: external_connection,
961                timestamp_interval,
962            });
963            (desc, data_source)
964        }
965    };
966
967    let if_not_exists = *if_not_exists;
968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970    // Check for an object in the catalog with this same name
971    let full_name = scx.catalog.resolve_full_name(&name);
972    let partial_name = PartialItemName::from(full_name.clone());
973    // For PostgreSQL compatibility, we need to prevent creating sources when
974    // there is an existing object *or* type of the same name.
975    if let (false, Ok(item)) = (
976        if_not_exists,
977        scx.catalog.resolve_item_or_type(&partial_name),
978    ) {
979        return Err(PlanError::ItemAlreadyExists {
980            name: full_name.to_string(),
981            item_type: item.item_type(),
982        });
983    }
984
985    // We will rewrite the cluster if one is not provided, so we must use the
986    // `in_cluster` value we plan to normalize when we canonicalize the create
987    // statement.
988    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992    // Determine a default timeline for the source.
993    let timeline = match envelope {
994        SourceEnvelope::CdcV2 => {
995            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996        }
997        _ => Timeline::EpochMilliseconds,
998    };
999
1000    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002    let source = Source {
1003        create_sql,
1004        data_source,
1005        desc,
1006        compaction_window,
1007    };
1008
1009    Ok(Plan::CreateSource(CreateSourcePlan {
1010        name,
1011        source,
1012        if_not_exists,
1013        timeline,
1014        in_cluster: Some(in_cluster.id()),
1015    }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019    scx: &StatementContext<'_>,
1020    source_connection: &CreateSourceConnection<Aug>,
1021    include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023    Ok(match source_connection {
1024        CreateSourceConnection::Kafka {
1025            connection,
1026            options,
1027        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028            scx,
1029            connection,
1030            options,
1031            include_metadata,
1032        )?),
1033        CreateSourceConnection::Postgres {
1034            connection,
1035            options,
1036        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037            scx, connection, options,
1038        )?),
1039        CreateSourceConnection::SqlServer {
1040            connection,
1041            options,
1042        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043            scx, connection, options,
1044        )?),
1045        CreateSourceConnection::MySql {
1046            connection,
1047            options,
1048        } => {
1049            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050        }
1051        CreateSourceConnection::LoadGenerator { generator, options } => {
1052            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053                scx,
1054                generator,
1055                options,
1056                include_metadata,
1057            )?)
1058        }
1059    })
1060}
1061
1062fn plan_load_generator_source_connection(
1063    scx: &StatementContext<'_>,
1064    generator: &ast::LoadGenerator,
1065    options: &Vec<LoadGeneratorOption<Aug>>,
1066    include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068    let load_generator =
1069        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070    let LoadGeneratorOptionExtracted {
1071        tick_interval,
1072        as_of,
1073        up_to,
1074        ..
1075    } = options.clone().try_into()?;
1076    let tick_micros = match tick_interval {
1077        Some(interval) => Some(interval.as_micros().try_into()?),
1078        None => None,
1079    };
1080    if up_to < as_of {
1081        sql_bail!("UP TO cannot be less than AS OF");
1082    }
1083    Ok(LoadGeneratorSourceConnection {
1084        load_generator,
1085        tick_micros,
1086        as_of,
1087        up_to,
1088    })
1089}
1090
1091fn plan_mysql_source_connection(
1092    scx: &StatementContext<'_>,
1093    connection: &ResolvedItemName,
1094    options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096    let connection_item = scx.get_item_by_resolved_name(connection)?;
1097    match connection_item.connection()? {
1098        Connection::MySql(connection) => connection,
1099        _ => sql_bail!(
1100            "{} is not a MySQL connection",
1101            scx.catalog.resolve_full_name(connection_item.name())
1102        ),
1103    };
1104    let MySqlConfigOptionExtracted {
1105        details,
1106        // text/exclude columns are already part of the source-exports and are only included
1107        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1108        // be removed once we drop support for implicitly created subsources.
1109        text_columns: _,
1110        exclude_columns: _,
1111        seen: _,
1112    } = options.clone().try_into()?;
1113    let details = details
1114        .as_ref()
1115        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119    Ok(MySqlSourceConnection {
1120        connection: connection_item.id(),
1121        connection_id: connection_item.id(),
1122        details,
1123    })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127    scx: &StatementContext<'_>,
1128    connection: &ResolvedItemName,
1129    options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131    let connection_item = scx.get_item_by_resolved_name(connection)?;
1132    match connection_item.connection()? {
1133        Connection::SqlServer(connection) => connection,
1134        _ => sql_bail!(
1135            "{} is not a SQL Server connection",
1136            scx.catalog.resolve_full_name(connection_item.name())
1137        ),
1138    };
1139    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140    let details = details
1141        .as_ref()
1142        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143    let extras = hex::decode(details)
1144        .map_err(|e| sql_err!("{e}"))
1145        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147    Ok(SqlServerSourceConnection {
1148        connection_id: connection_item.id(),
1149        connection: connection_item.id(),
1150        extras,
1151    })
1152}
1153
1154fn plan_postgres_source_connection(
1155    scx: &StatementContext<'_>,
1156    connection: &ResolvedItemName,
1157    options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159    let connection_item = scx.get_item_by_resolved_name(connection)?;
1160    let PgConfigOptionExtracted {
1161        details,
1162        publication,
1163        // text columns are already part of the source-exports and are only included
1164        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1165        // be removed once we drop support for implicitly created subsources.
1166        text_columns: _,
1167        // exclude columns are already part of the source-exports and are only included
1168        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1169        // be removed once we drop support for implicitly created subsources.
1170        exclude_columns: _,
1171        seen: _,
1172    } = options.clone().try_into()?;
1173    let details = details
1174        .as_ref()
1175        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177    let details =
1178        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179    let publication_details =
1180        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181    Ok(PostgresSourceConnection {
1182        connection: connection_item.id(),
1183        connection_id: connection_item.id(),
1184        publication: publication.expect("validated exists during purification"),
1185        publication_details,
1186    })
1187}
1188
1189fn plan_kafka_source_connection(
1190    scx: &StatementContext<'_>,
1191    connection_name: &ResolvedItemName,
1192    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193    include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197        sql_bail!(
1198            "{} is not a kafka connection",
1199            scx.catalog.resolve_full_name(connection_item.name())
1200        )
1201    }
1202    let KafkaSourceConfigOptionExtracted {
1203        group_id_prefix,
1204        topic,
1205        topic_metadata_refresh_interval,
1206        start_timestamp: _, // purified into `start_offset`
1207        start_offset,
1208        seen: _,
1209    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210    let topic = topic.expect("validated exists during purification");
1211    let mut start_offsets = BTreeMap::new();
1212    if let Some(offsets) = start_offset {
1213        for (part, offset) in offsets.iter().enumerate() {
1214            if *offset < 0 {
1215                sql_bail!("START OFFSET must be a nonnegative integer");
1216            }
1217            start_offsets.insert(i32::try_from(part)?, *offset);
1218        }
1219    }
1220    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221        // This is a librdkafka-enforced restriction that, if violated,
1222        // would result in a runtime error for the source.
1223        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224    }
1225    let metadata_columns = include_metadata
1226        .into_iter()
1227        .flat_map(|item| match item {
1228            SourceIncludeMetadata::Timestamp { alias } => {
1229                let name = match alias {
1230                    Some(name) => name.to_string(),
1231                    None => "timestamp".to_owned(),
1232                };
1233                Some((name, KafkaMetadataKind::Timestamp))
1234            }
1235            SourceIncludeMetadata::Partition { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "partition".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Partition))
1241            }
1242            SourceIncludeMetadata::Offset { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "offset".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Offset))
1248            }
1249            SourceIncludeMetadata::Headers { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "headers".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Headers))
1255            }
1256            SourceIncludeMetadata::Header {
1257                alias,
1258                key,
1259                use_bytes,
1260            } => Some((
1261                alias.to_string(),
1262                KafkaMetadataKind::Header {
1263                    key: key.clone(),
1264                    use_bytes: *use_bytes,
1265                },
1266            )),
1267            SourceIncludeMetadata::Key { .. } => {
1268                // handled below
1269                None
1270            }
1271        })
1272        .collect();
1273    Ok(KafkaSourceConnection {
1274        connection: connection_item.id(),
1275        connection_id: connection_item.id(),
1276        topic,
1277        start_offsets,
1278        group_id_prefix,
1279        topic_metadata_refresh_interval,
1280        metadata_columns,
1281    })
1282}
1283
1284fn apply_source_envelope_encoding(
1285    scx: &StatementContext,
1286    envelope: &ast::SourceEnvelope,
1287    format: &Option<FormatSpecifier<Aug>>,
1288    key_desc: Option<RelationDesc>,
1289    value_desc: RelationDesc,
1290    include_metadata: &[SourceIncludeMetadata],
1291    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292    source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294    (
1295        RelationDesc,
1296        SourceEnvelope,
1297        Option<SourceDataEncoding<ReferencedConnection>>,
1298    ),
1299    PlanError,
1300> {
1301    let encoding = match format {
1302        Some(format) => Some(get_encoding(scx, format, envelope)?),
1303        None => None,
1304    };
1305
1306    let (key_desc, value_desc) = match &encoding {
1307        Some(encoding) => {
1308            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1309            // single column of type bytes.
1310            match value_desc.typ().columns() {
1311                [typ] => match typ.scalar_type {
1312                    SqlScalarType::Bytes => {}
1313                    _ => sql_bail!(
1314                        "The schema produced by the source is incompatible with format decoding"
1315                    ),
1316                },
1317                _ => sql_bail!(
1318                    "The schema produced by the source is incompatible with format decoding"
1319                ),
1320            }
1321
1322            let (key_desc, value_desc) = encoding.desc()?;
1323
1324            // TODO(petrosagg): This piece of code seems to be making a statement about the
1325            // nullability of the NONE envelope when the source is Kafka. As written, the code
1326            // misses opportunities to mark columns as not nullable and is over conservative. For
1327            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1328            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1329            // should be replaced with precise type-level reasoning.
1330            let key_desc = key_desc.map(|desc| {
1331                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333                if is_kafka && is_envelope_none {
1334                    RelationDesc::from_names_and_types(
1335                        desc.into_iter()
1336                            .map(|(name, typ)| (name, typ.nullable(true))),
1337                    )
1338                } else {
1339                    desc
1340                }
1341            });
1342            (key_desc, value_desc)
1343        }
1344        None => (key_desc, value_desc),
1345    };
1346
1347    // KEY VALUE load generators are the only UPSERT source that
1348    // has no encoding but defaults to `INCLUDE KEY`.
1349    //
1350    // As discussed
1351    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1352    // removing this special case amounts to deciding how to handle null keys
1353    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1354    // this special case in.
1355    //
1356    // Note that this is safe because this generator is
1357    // 1. The only source with no encoding that can have its key included.
1358    // 2. Never produces null keys (or values, for that matter).
1359    let key_envelope_no_encoding = matches!(
1360        source_connection,
1361        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362            load_generator: LoadGenerator::KeyValue(_),
1363            ..
1364        })
1365    );
1366    let mut key_envelope = get_key_envelope(
1367        include_metadata,
1368        encoding.as_ref(),
1369        key_envelope_no_encoding,
1370    )?;
1371
1372    match (&envelope, &key_envelope) {
1373        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376        ),
1377        _ => {}
1378    };
1379
1380    // Not all source envelopes are compatible with all source connections.
1381    // Whoever constructs the source ingestion pipeline is responsible for
1382    // choosing compatible envelopes and connections.
1383    //
1384    // TODO(guswynn): ambiguously assert which connections and envelopes are
1385    // compatible in typechecking
1386    //
1387    // TODO: remove bails as more support for upsert is added.
1388    let envelope = match &envelope {
1389        // TODO: fixup key envelope
1390        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391        ast::SourceEnvelope::Debezium => {
1392            //TODO check that key envelope is not set
1393            let after_idx = match typecheck_debezium(&value_desc) {
1394                Ok((_before_idx, after_idx)) => Ok(after_idx),
1395                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396                    Some(DataEncoding::Avro(_)) => Err(type_err),
1397                    _ => Err(sql_err!(
1398                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399                    )),
1400                },
1401            }?;
1402
1403            UnplannedSourceEnvelope::Upsert {
1404                style: UpsertStyle::Debezium { after_idx },
1405            }
1406        }
1407        ast::SourceEnvelope::Upsert {
1408            value_decode_err_policy,
1409        } => {
1410            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411                None => {
1412                    if !key_envelope_no_encoding {
1413                        bail_unsupported!(format!(
1414                            "UPSERT requires a key/value format: {:?}",
1415                            format
1416                        ))
1417                    }
1418                    None
1419                }
1420                Some(key_encoding) => Some(key_encoding),
1421            };
1422            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1423            // specified.
1424            if key_envelope == KeyEnvelope::None {
1425                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426            }
1427            // If the value decode error policy is not set we use the default upsert style.
1428            let style = match value_decode_err_policy.as_slice() {
1429                [] => UpsertStyle::Default(key_envelope),
1430                [SourceErrorPolicy::Inline { alias }] => {
1431                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432                    UpsertStyle::ValueErrInline {
1433                        key_envelope,
1434                        error_column: alias
1435                            .as_ref()
1436                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437                    }
1438                }
1439                _ => {
1440                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441                }
1442            };
1443
1444            UnplannedSourceEnvelope::Upsert { style }
1445        }
1446        ast::SourceEnvelope::CdcV2 => {
1447            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448            //TODO check that key envelope is not set
1449            match format {
1450                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452            }
1453            UnplannedSourceEnvelope::CdcV2
1454        }
1455    };
1456
1457    let metadata_desc = included_column_desc(metadata_columns_desc);
1458    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460    Ok((desc, envelope, encoding))
1461}
1462
1463/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1464/// of columns and constraints.
1465fn plan_source_export_desc(
1466    scx: &StatementContext,
1467    name: &UnresolvedItemName,
1468    columns: &Vec<ColumnDef<Aug>>,
1469    constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471    let names: Vec<_> = columns
1472        .iter()
1473        .map(|c| normalize::column_name(c.name.clone()))
1474        .collect();
1475
1476    if let Some(dup) = names.iter().duplicates().next() {
1477        sql_bail!("column {} specified more than once", dup.quoted());
1478    }
1479
1480    // Build initial relation type that handles declared data types
1481    // and NOT NULL constraints.
1482    let mut column_types = Vec::with_capacity(columns.len());
1483    let mut keys = Vec::new();
1484
1485    for (i, c) in columns.into_iter().enumerate() {
1486        let aug_data_type = &c.data_type;
1487        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488        let mut nullable = true;
1489        for option in &c.options {
1490            match &option.option {
1491                ColumnOption::NotNull => nullable = false,
1492                ColumnOption::Default(_) => {
1493                    bail_unsupported!("Source export with default value")
1494                }
1495                ColumnOption::Unique { is_primary } => {
1496                    keys.push(vec![i]);
1497                    if *is_primary {
1498                        nullable = false;
1499                    }
1500                }
1501                other => {
1502                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1503                }
1504            }
1505        }
1506        column_types.push(ty.nullable(nullable));
1507    }
1508
1509    let mut seen_primary = false;
1510    'c: for constraint in constraints {
1511        match constraint {
1512            TableConstraint::Unique {
1513                name: _,
1514                columns,
1515                is_primary,
1516                nulls_not_distinct,
1517            } => {
1518                if seen_primary && *is_primary {
1519                    sql_bail!(
1520                        "multiple primary keys for source export {} are not allowed",
1521                        name.to_ast_string_stable()
1522                    );
1523                }
1524                seen_primary = *is_primary || seen_primary;
1525
1526                let mut key = vec![];
1527                for column in columns {
1528                    let column = normalize::column_name(column.clone());
1529                    match names.iter().position(|name| *name == column) {
1530                        None => sql_bail!("unknown column in constraint: {}", column),
1531                        Some(i) => {
1532                            let nullable = &mut column_types[i].nullable;
1533                            if *is_primary {
1534                                if *nulls_not_distinct {
1535                                    sql_bail!(
1536                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537                                    );
1538                                }
1539                                *nullable = false;
1540                            } else if !(*nulls_not_distinct || !*nullable) {
1541                                // Non-primary key unique constraints are only keys if all of their
1542                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1543                                break 'c;
1544                            }
1545
1546                            key.push(i);
1547                        }
1548                    }
1549                }
1550
1551                if *is_primary {
1552                    keys.insert(0, key);
1553                } else {
1554                    keys.push(key);
1555                }
1556            }
1557            TableConstraint::ForeignKey { .. } => {
1558                bail_unsupported!("Source export with a foreign key")
1559            }
1560            TableConstraint::Check { .. } => {
1561                bail_unsupported!("Source export with a check constraint")
1562            }
1563        }
1564    }
1565
1566    let typ = SqlRelationType::new(column_types).with_keys(keys);
1567    let desc = RelationDesc::new(typ, names);
1568    Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572    CreateSubsourceOption,
1573    (Progress, bool, Default(false)),
1574    (ExternalReference, UnresolvedItemName),
1575    (RetainHistory, OptionalDuration),
1576    (TextColumns, Vec::<Ident>, Default(vec![])),
1577    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578    (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582    scx: &StatementContext,
1583    stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585    let CreateSubsourceStatement {
1586        name,
1587        columns,
1588        of_source,
1589        constraints,
1590        if_not_exists,
1591        with_options,
1592    } = &stmt;
1593
1594    let CreateSubsourceOptionExtracted {
1595        progress,
1596        retain_history,
1597        external_reference,
1598        text_columns,
1599        exclude_columns,
1600        details,
1601        seen: _,
1602    } = with_options.clone().try_into()?;
1603
1604    // This invariant is enforced during purification; we are responsible for
1605    // creating the AST for subsources as a response to CREATE SOURCE
1606    // statements, so this would fire in integration testing if we failed to
1607    // uphold it.
1608    assert!(
1609        progress ^ (external_reference.is_some() && of_source.is_some()),
1610        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611    );
1612
1613    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615    let data_source = if let Some(source_reference) = of_source {
1616        // If the new source table syntax is forced we should not be creating any non-progress
1617        // subsources.
1618        if scx.catalog.system_vars().enable_create_table_from_source()
1619            && scx.catalog.system_vars().force_source_table_syntax()
1620        {
1621            Err(PlanError::UseTablesForSources(
1622                "CREATE SUBSOURCE".to_string(),
1623            ))?;
1624        }
1625
1626        // This is a subsource with the "natural" dependency order, i.e. it is
1627        // not a legacy subsource with the inverted structure.
1628        let ingestion_id = *source_reference.item_id();
1629        let external_reference = external_reference.unwrap();
1630
1631        // Decode the details option stored on the subsource statement, which contains information
1632        // created during the purification process.
1633        let details = details
1634            .as_ref()
1635            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637        let details =
1638            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639        let details =
1640            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641        let details = match details {
1642            SourceExportStatementDetails::Postgres { table } => {
1643                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644                    column_casts: crate::pure::postgres::generate_column_casts(
1645                        scx,
1646                        &table,
1647                        &text_columns,
1648                    )?,
1649                    table,
1650                })
1651            }
1652            SourceExportStatementDetails::MySql {
1653                table,
1654                initial_gtid_set,
1655            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656                table,
1657                initial_gtid_set,
1658                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659                exclude_columns: exclude_columns
1660                    .into_iter()
1661                    .map(|c| c.into_string())
1662                    .collect(),
1663            }),
1664            SourceExportStatementDetails::SqlServer {
1665                table,
1666                capture_instance,
1667                initial_lsn,
1668            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669                capture_instance,
1670                table,
1671                initial_lsn,
1672                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673                exclude_columns: exclude_columns
1674                    .into_iter()
1675                    .map(|c| c.into_string())
1676                    .collect(),
1677            }),
1678            SourceExportStatementDetails::LoadGenerator { output } => {
1679                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680            }
1681            SourceExportStatementDetails::Kafka {} => {
1682                bail_unsupported!("subsources cannot reference Kafka sources")
1683            }
1684        };
1685        DataSourceDesc::IngestionExport {
1686            ingestion_id,
1687            external_reference,
1688            details,
1689            // Subsources don't currently support non-default envelopes / encoding
1690            data_config: SourceExportDataConfig {
1691                envelope: SourceEnvelope::None(NoneEnvelope {
1692                    key_envelope: KeyEnvelope::None,
1693                    key_arity: 0,
1694                }),
1695                encoding: None,
1696            },
1697        }
1698    } else if progress {
1699        DataSourceDesc::Progress
1700    } else {
1701        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702    };
1703
1704    let if_not_exists = *if_not_exists;
1705    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710    let source = Source {
1711        create_sql,
1712        data_source,
1713        desc,
1714        compaction_window,
1715    };
1716
1717    Ok(Plan::CreateSource(CreateSourcePlan {
1718        name,
1719        source,
1720        if_not_exists,
1721        timeline: Timeline::EpochMilliseconds,
1722        in_cluster: None,
1723    }))
1724}
1725
1726generate_extracted_config!(
1727    TableFromSourceOption,
1728    (TextColumns, Vec::<Ident>, Default(vec![])),
1729    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730    (PartitionBy, Vec<Ident>),
1731    (RetainHistory, OptionalDuration),
1732    (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736    scx: &StatementContext,
1737    stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739    if !scx.catalog.system_vars().enable_create_table_from_source() {
1740        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741    }
1742
1743    let CreateTableFromSourceStatement {
1744        name,
1745        columns,
1746        constraints,
1747        if_not_exists,
1748        source,
1749        external_reference,
1750        envelope,
1751        format,
1752        include_metadata,
1753        with_options,
1754    } = &stmt;
1755
1756    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758    let TableFromSourceOptionExtracted {
1759        text_columns,
1760        exclude_columns,
1761        retain_history,
1762        partition_by,
1763        details,
1764        seen: _,
1765    } = with_options.clone().try_into()?;
1766
1767    let source_item = scx.get_item_by_resolved_name(source)?;
1768    let ingestion_id = source_item.id();
1769
1770    // Decode the details option stored on the statement, which contains information
1771    // created during the purification process.
1772    let details = details
1773        .as_ref()
1774        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776    let details =
1777        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778    let details =
1779        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782        && include_metadata
1783            .iter()
1784            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785    {
1786        // TODO(guswynn): should this be `bail_unsupported!`?
1787        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788    }
1789    if !matches!(
1790        details,
1791        SourceExportStatementDetails::Kafka { .. }
1792            | SourceExportStatementDetails::LoadGenerator { .. }
1793    ) && !include_metadata.is_empty()
1794    {
1795        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796    }
1797
1798    let details = match details {
1799        SourceExportStatementDetails::Postgres { table } => {
1800            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801                column_casts: crate::pure::postgres::generate_column_casts(
1802                    scx,
1803                    &table,
1804                    &text_columns,
1805                )?,
1806                table,
1807            })
1808        }
1809        SourceExportStatementDetails::MySql {
1810            table,
1811            initial_gtid_set,
1812        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813            table,
1814            initial_gtid_set,
1815            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816            exclude_columns: exclude_columns
1817                .into_iter()
1818                .map(|c| c.into_string())
1819                .collect(),
1820        }),
1821        SourceExportStatementDetails::SqlServer {
1822            table,
1823            capture_instance,
1824            initial_lsn,
1825        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826            table,
1827            capture_instance,
1828            initial_lsn,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834        }),
1835        SourceExportStatementDetails::LoadGenerator { output } => {
1836            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837        }
1838        SourceExportStatementDetails::Kafka {} => {
1839            if !include_metadata.is_empty()
1840                && !matches!(
1841                    envelope,
1842                    ast::SourceEnvelope::Upsert { .. }
1843                        | ast::SourceEnvelope::None
1844                        | ast::SourceEnvelope::Debezium
1845                )
1846            {
1847                // TODO(guswynn): should this be `bail_unsupported!`?
1848                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849            }
1850
1851            let metadata_columns = include_metadata
1852                .into_iter()
1853                .flat_map(|item| match item {
1854                    SourceIncludeMetadata::Timestamp { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "timestamp".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Timestamp))
1860                    }
1861                    SourceIncludeMetadata::Partition { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "partition".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Partition))
1867                    }
1868                    SourceIncludeMetadata::Offset { alias } => {
1869                        let name = match alias {
1870                            Some(name) => name.to_string(),
1871                            None => "offset".to_owned(),
1872                        };
1873                        Some((name, KafkaMetadataKind::Offset))
1874                    }
1875                    SourceIncludeMetadata::Headers { alias } => {
1876                        let name = match alias {
1877                            Some(name) => name.to_string(),
1878                            None => "headers".to_owned(),
1879                        };
1880                        Some((name, KafkaMetadataKind::Headers))
1881                    }
1882                    SourceIncludeMetadata::Header {
1883                        alias,
1884                        key,
1885                        use_bytes,
1886                    } => Some((
1887                        alias.to_string(),
1888                        KafkaMetadataKind::Header {
1889                            key: key.clone(),
1890                            use_bytes: *use_bytes,
1891                        },
1892                    )),
1893                    SourceIncludeMetadata::Key { .. } => {
1894                        // handled below
1895                        None
1896                    }
1897                })
1898                .collect();
1899
1900            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901        }
1902    };
1903
1904    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1907    // during purification and define the `columns` and `constraints` fields for the statement,
1908    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1909    // we use the source connection's default schema.
1910    let (key_desc, value_desc) =
1911        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912            let columns = match columns {
1913                TableFromSourceColumns::Defined(columns) => columns,
1914                _ => unreachable!(),
1915            };
1916            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917            (None, desc)
1918        } else {
1919            let key_desc = source_connection.default_key_desc();
1920            let value_desc = source_connection.default_value_desc();
1921            (Some(key_desc), value_desc)
1922        };
1923
1924    let metadata_columns_desc = match &details {
1925        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926            metadata_columns, ..
1927        }) => kafka_metadata_columns_desc(metadata_columns),
1928        _ => vec![],
1929    };
1930
1931    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932        scx,
1933        &envelope,
1934        format,
1935        key_desc,
1936        value_desc,
1937        include_metadata,
1938        metadata_columns_desc,
1939        source_connection,
1940    )?;
1941    if let TableFromSourceColumns::Named(col_names) = columns {
1942        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943    }
1944
1945    let names: Vec<_> = desc.iter_names().cloned().collect();
1946    if let Some(dup) = names.iter().duplicates().next() {
1947        sql_bail!("column {} specified more than once", dup.quoted());
1948    }
1949
1950    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952    // Allow users to specify a timeline. If they do not, determine a default
1953    // timeline for the source.
1954    let timeline = match envelope {
1955        SourceEnvelope::CdcV2 => {
1956            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957        }
1958        _ => Timeline::EpochMilliseconds,
1959    };
1960
1961    if let Some(partition_by) = partition_by {
1962        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963        check_partition_by(&desc, partition_by)?;
1964    }
1965
1966    let data_source = DataSourceDesc::IngestionExport {
1967        ingestion_id,
1968        external_reference: external_reference
1969            .as_ref()
1970            .expect("populated in purification")
1971            .clone(),
1972        details,
1973        data_config: SourceExportDataConfig { envelope, encoding },
1974    };
1975
1976    let if_not_exists = *if_not_exists;
1977
1978    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981    let table = Table {
1982        create_sql,
1983        desc: VersionedRelationDesc::new(desc),
1984        temporary: false,
1985        compaction_window,
1986        data_source: TableDataSource::DataSource {
1987            desc: data_source,
1988            timeline,
1989        },
1990    };
1991
1992    Ok(Plan::CreateTable(CreateTablePlan {
1993        name,
1994        table,
1995        if_not_exists,
1996    }))
1997}
1998
1999generate_extracted_config!(
2000    LoadGeneratorOption,
2001    (TickInterval, Duration),
2002    (AsOf, u64, Default(0_u64)),
2003    (UpTo, u64, Default(u64::MAX)),
2004    (ScaleFactor, f64),
2005    (MaxCardinality, u64),
2006    (Keys, u64),
2007    (SnapshotRounds, u64),
2008    (TransactionalSnapshot, bool),
2009    (ValueSize, u64),
2010    (Seed, u64),
2011    (Partitions, u64),
2012    (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016    pub(super) fn ensure_only_valid_options(
2017        &self,
2018        loadgen: &ast::LoadGenerator,
2019    ) -> Result<(), PlanError> {
2020        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022        let mut options = self.seen.clone();
2023
2024        let permitted_options: &[_] = match loadgen {
2025            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031            ast::LoadGenerator::KeyValue => &[
2032                TickInterval,
2033                Keys,
2034                SnapshotRounds,
2035                TransactionalSnapshot,
2036                ValueSize,
2037                Seed,
2038                Partitions,
2039                BatchSize,
2040            ],
2041        };
2042
2043        for o in permitted_options {
2044            options.remove(o);
2045        }
2046
2047        if !options.is_empty() {
2048            sql_bail!(
2049                "{} load generators do not support {} values",
2050                loadgen,
2051                options.iter().join(", ")
2052            )
2053        }
2054
2055        Ok(())
2056    }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060    scx: &StatementContext,
2061    loadgen: &ast::LoadGenerator,
2062    options: &[LoadGeneratorOption<Aug>],
2063    include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066    extracted.ensure_only_valid_options(loadgen)?;
2067
2068    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070    }
2071
2072    let load_generator = match loadgen {
2073        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074        ast::LoadGenerator::Clock => {
2075            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076            LoadGenerator::Clock
2077        }
2078        ast::LoadGenerator::Counter => {
2079            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080            let LoadGeneratorOptionExtracted {
2081                max_cardinality, ..
2082            } = extracted;
2083            LoadGenerator::Counter { max_cardinality }
2084        }
2085        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086        ast::LoadGenerator::Datums => {
2087            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088            LoadGenerator::Datums
2089        }
2090        ast::LoadGenerator::Tpch => {
2091            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093            // Default to 0.01 scale factor (=10MB).
2094            let sf: f64 = scale_factor.unwrap_or(0.01);
2095            if !sf.is_finite() || sf < 0.0 {
2096                sql_bail!("unsupported scale factor {sf}");
2097            }
2098
2099            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100                let total = (sf * multiplier).floor();
2101                let mut i = i64::try_cast_from(total)
2102                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103                if i < 1 {
2104                    i = 1;
2105                }
2106                Ok(i)
2107            };
2108
2109            // The multiplications here are safely unchecked because they will
2110            // overflow to infinity, which will be caught by f64_to_i64.
2111            let count_supplier = f_to_i(10_000f64)?;
2112            let count_part = f_to_i(200_000f64)?;
2113            let count_customer = f_to_i(150_000f64)?;
2114            let count_orders = f_to_i(150_000f64 * 10f64)?;
2115            let count_clerk = f_to_i(1_000f64)?;
2116
2117            LoadGenerator::Tpch {
2118                count_supplier,
2119                count_part,
2120                count_customer,
2121                count_orders,
2122                count_clerk,
2123            }
2124        }
2125        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127            let LoadGeneratorOptionExtracted {
2128                keys,
2129                snapshot_rounds,
2130                transactional_snapshot,
2131                value_size,
2132                tick_interval,
2133                seed,
2134                partitions,
2135                batch_size,
2136                ..
2137            } = extracted;
2138
2139            let mut include_offset = None;
2140            for im in include_metadata {
2141                match im {
2142                    SourceIncludeMetadata::Offset { alias } => {
2143                        include_offset = match alias {
2144                            Some(alias) => Some(alias.to_string()),
2145                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146                        }
2147                    }
2148                    SourceIncludeMetadata::Key { .. } => continue,
2149
2150                    _ => {
2151                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152                    }
2153                };
2154            }
2155
2156            let lgkv = KeyValueLoadGenerator {
2157                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158                snapshot_rounds: snapshot_rounds
2159                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160                // Defaults to true.
2161                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162                value_size: value_size
2163                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164                partitions: partitions
2165                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166                tick_interval,
2167                batch_size: batch_size
2168                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170                include_offset,
2171            };
2172
2173            if lgkv.keys == 0
2174                || lgkv.partitions == 0
2175                || lgkv.value_size == 0
2176                || lgkv.batch_size == 0
2177            {
2178                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179            }
2180
2181            if lgkv.keys % lgkv.partitions != 0 {
2182                sql_bail!("KEYS must be a multiple of PARTITIONS")
2183            }
2184
2185            if lgkv.batch_size > lgkv.keys {
2186                sql_bail!("KEYS must be larger than BATCH SIZE")
2187            }
2188
2189            // This constraints simplifies the source implementation.
2190            // We can lift it later.
2191            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193            }
2194
2195            if lgkv.snapshot_rounds == 0 {
2196                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197            }
2198
2199            LoadGenerator::KeyValue(lgkv)
2200        }
2201    };
2202
2203    Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207    let before = value_desc.get_by_name(&"before".into());
2208    let (after_idx, after_ty) = value_desc
2209        .get_by_name(&"after".into())
2210        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211    let before_idx = if let Some((before_idx, before_ty)) = before {
2212        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213            sql_bail!("'before' column must be of type record");
2214        }
2215        if before_ty != after_ty {
2216            sql_bail!("'before' type differs from 'after' column");
2217        }
2218        Some(before_idx)
2219    } else {
2220        None
2221    };
2222    Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226    scx: &StatementContext,
2227    format: &FormatSpecifier<Aug>,
2228    envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230    let encoding = match format {
2231        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232        FormatSpecifier::KeyValue { key, value } => {
2233            let key = {
2234                let encoding = get_encoding_inner(scx, key)?;
2235                Some(encoding.key.unwrap_or(encoding.value))
2236            };
2237            let value = get_encoding_inner(scx, value)?.value;
2238            SourceDataEncoding { key, value }
2239        }
2240    };
2241
2242    let requires_keyvalue = matches!(
2243        envelope,
2244        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245    );
2246    let is_keyvalue = encoding.key.is_some();
2247    if requires_keyvalue && !is_keyvalue {
2248        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249    };
2250
2251    Ok(encoding)
2252}
2253
2254/// Determine the cluster ID to use for this item.
2255///
2256/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2257/// Because of this, do not normalize/canonicalize the create SQL statement
2258/// until after calling this function.
2259fn source_sink_cluster_config<'a, 'ctx>(
2260    scx: &'a StatementContext<'ctx>,
2261    in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263    let cluster = match in_cluster {
2264        None => {
2265            let cluster = scx.catalog.resolve_cluster(None)?;
2266            *in_cluster = Some(ResolvedClusterName {
2267                id: cluster.id(),
2268                print_name: None,
2269            });
2270            cluster
2271        }
2272        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273    };
2274
2275    Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282    pub key_schema: Option<String>,
2283    pub value_schema: String,
2284    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285    pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289    scx: &StatementContext,
2290    format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292    let value = match format {
2293        Format::Bytes => DataEncoding::Bytes,
2294        Format::Avro(schema) => {
2295            let Schema {
2296                key_schema,
2297                value_schema,
2298                csr_connection,
2299                confluent_wire_format,
2300            } = match schema {
2301                // TODO(jldlaughlin): we need a way to pass in primary key information
2302                // when building a source from a string or file.
2303                AvroSchema::InlineSchema {
2304                    schema: ast::Schema { schema },
2305                    with_options,
2306                } => {
2307                    let AvroSchemaOptionExtracted {
2308                        confluent_wire_format,
2309                        ..
2310                    } = with_options.clone().try_into()?;
2311
2312                    Schema {
2313                        key_schema: None,
2314                        value_schema: schema.clone(),
2315                        csr_connection: None,
2316                        confluent_wire_format,
2317                    }
2318                }
2319                AvroSchema::Csr {
2320                    csr_connection:
2321                        CsrConnectionAvro {
2322                            connection,
2323                            seed,
2324                            key_strategy: _,
2325                            value_strategy: _,
2326                        },
2327                } => {
2328                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329                    let csr_connection = match item.connection()? {
2330                        Connection::Csr(_) => item.id(),
2331                        _ => {
2332                            sql_bail!(
2333                                "{} is not a schema registry connection",
2334                                scx.catalog
2335                                    .resolve_full_name(item.name())
2336                                    .to_string()
2337                                    .quoted()
2338                            )
2339                        }
2340                    };
2341
2342                    if let Some(seed) = seed {
2343                        Schema {
2344                            key_schema: seed.key_schema.clone(),
2345                            value_schema: seed.value_schema.clone(),
2346                            csr_connection: Some(csr_connection),
2347                            confluent_wire_format: true,
2348                        }
2349                    } else {
2350                        unreachable!("CSR seed resolution should already have been called: Avro")
2351                    }
2352                }
2353            };
2354
2355            if let Some(key_schema) = key_schema {
2356                return Ok(SourceDataEncoding {
2357                    key: Some(DataEncoding::Avro(AvroEncoding {
2358                        schema: key_schema,
2359                        csr_connection: csr_connection.clone(),
2360                        confluent_wire_format,
2361                    })),
2362                    value: DataEncoding::Avro(AvroEncoding {
2363                        schema: value_schema,
2364                        csr_connection,
2365                        confluent_wire_format,
2366                    }),
2367                });
2368            } else {
2369                DataEncoding::Avro(AvroEncoding {
2370                    schema: value_schema,
2371                    csr_connection,
2372                    confluent_wire_format,
2373                })
2374            }
2375        }
2376        Format::Protobuf(schema) => match schema {
2377            ProtobufSchema::Csr {
2378                csr_connection:
2379                    CsrConnectionProtobuf {
2380                        connection:
2381                            CsrConnection {
2382                                connection,
2383                                options,
2384                            },
2385                        seed,
2386                    },
2387            } => {
2388                if let Some(CsrSeedProtobuf { key, value }) = seed {
2389                    let item = scx.get_item_by_resolved_name(connection)?;
2390                    let _ = match item.connection()? {
2391                        Connection::Csr(connection) => connection,
2392                        _ => {
2393                            sql_bail!(
2394                                "{} is not a schema registry connection",
2395                                scx.catalog
2396                                    .resolve_full_name(item.name())
2397                                    .to_string()
2398                                    .quoted()
2399                            )
2400                        }
2401                    };
2402
2403                    if !options.is_empty() {
2404                        sql_bail!("Protobuf CSR connections do not support any options");
2405                    }
2406
2407                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2408                        descriptors: strconv::parse_bytes(&value.schema)?,
2409                        message_name: value.message_name.clone(),
2410                        confluent_wire_format: true,
2411                    });
2412                    if let Some(key) = key {
2413                        return Ok(SourceDataEncoding {
2414                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415                                descriptors: strconv::parse_bytes(&key.schema)?,
2416                                message_name: key.message_name.clone(),
2417                                confluent_wire_format: true,
2418                            })),
2419                            value,
2420                        });
2421                    }
2422                    value
2423                } else {
2424                    unreachable!("CSR seed resolution should already have been called: Proto")
2425                }
2426            }
2427            ProtobufSchema::InlineSchema {
2428                message_name,
2429                schema: ast::Schema { schema },
2430            } => {
2431                let descriptors = strconv::parse_bytes(schema)?;
2432
2433                DataEncoding::Protobuf(ProtobufEncoding {
2434                    descriptors,
2435                    message_name: message_name.to_owned(),
2436                    confluent_wire_format: false,
2437                })
2438            }
2439        },
2440        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441            regex: mz_repr::adt::regex::Regex::new(regex, false)
2442                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443        }),
2444        Format::Csv { columns, delimiter } => {
2445            let columns = match columns {
2446                CsvColumns::Header { names } => {
2447                    if names.is_empty() {
2448                        sql_bail!("[internal error] column spec should get names in purify")
2449                    }
2450                    ColumnSpec::Header {
2451                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452                    }
2453                }
2454                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455            };
2456            DataEncoding::Csv(CsvEncoding {
2457                columns,
2458                delimiter: u8::try_from(*delimiter)
2459                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460            })
2461        }
2462        Format::Json { array: false } => DataEncoding::Json,
2463        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464        Format::Text => DataEncoding::Text,
2465    };
2466    Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469/// Extract the key envelope, if it is requested
2470fn get_key_envelope(
2471    included_items: &[SourceIncludeMetadata],
2472    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473    key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475    let key_definition = included_items
2476        .iter()
2477        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482            (Some(name), _) if key_envelope_no_encoding => {
2483                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484            }
2485            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486            (_, None) => {
2487                // `kd.alias` == `None` means `INCLUDE KEY`
2488                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2489                // These both make sense with the same error message
2490                sql_bail!(
2491                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492                        got bare FORMAT"
2493                );
2494            }
2495        }
2496    } else {
2497        Ok(KeyEnvelope::None)
2498    }
2499}
2500
2501/// Gets the key envelope for a given key encoding when no name for the key has
2502/// been requested by the user.
2503fn get_unnamed_key_envelope(
2504    key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506    // If the key is requested but comes from an unnamed type then it gets the name "key"
2507    //
2508    // Otherwise it gets the names of the columns in the type
2509    let is_composite = match key {
2510        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511        Some(
2512            DataEncoding::Avro(_)
2513            | DataEncoding::Csv(_)
2514            | DataEncoding::Protobuf(_)
2515            | DataEncoding::Regex { .. },
2516        ) => true,
2517        None => false,
2518    };
2519
2520    if is_composite {
2521        Ok(KeyEnvelope::Flattened)
2522    } else {
2523        Ok(KeyEnvelope::Named("key".to_string()))
2524    }
2525}
2526
2527pub fn describe_create_view(
2528    _: &StatementContext,
2529    _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531    Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535    scx: &StatementContext,
2536    def: &mut ViewDefinition<Aug>,
2537    temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539    let create_sql = normalize::create_statement(
2540        scx,
2541        Statement::CreateView(CreateViewStatement {
2542            if_exists: IfExistsBehavior::Error,
2543            temporary,
2544            definition: def.clone(),
2545        }),
2546    )?;
2547
2548    let ViewDefinition {
2549        name,
2550        columns,
2551        query,
2552    } = def;
2553
2554    let query::PlannedRootQuery {
2555        expr,
2556        mut desc,
2557        finishing,
2558        scope: _,
2559    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2561    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2562    // here to help with database-issues#236. However, in the meantime, there might be a better
2563    // approach to solve database-issues#236:
2564    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2565    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566        &finishing,
2567        expr.arity()
2568    ));
2569    if expr.contains_parameters()? {
2570        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571    }
2572
2573    let dependencies = expr
2574        .depends_on()
2575        .into_iter()
2576        .map(|gid| scx.catalog.resolve_item_id(&gid))
2577        .collect();
2578
2579    let name = if temporary {
2580        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581    } else {
2582        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583    };
2584
2585    plan_utils::maybe_rename_columns(
2586        format!("view {}", scx.catalog.resolve_full_name(&name)),
2587        &mut desc,
2588        columns,
2589    )?;
2590    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592    if let Some(dup) = names.iter().duplicates().next() {
2593        sql_bail!("column {} specified more than once", dup.quoted());
2594    }
2595
2596    let view = View {
2597        create_sql,
2598        expr,
2599        dependencies,
2600        column_names: names,
2601        temporary,
2602    };
2603
2604    Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608    scx: &StatementContext,
2609    mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611    let CreateViewStatement {
2612        temporary,
2613        if_exists,
2614        definition,
2615    } = &mut stmt;
2616    let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618    // Override the statement-level IfExistsBehavior with Skip if this is
2619    // explicitly requested in the PlanContext (the default is `false`).
2620    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623        let if_exists = true;
2624        let cascade = false;
2625        let maybe_item_to_drop = plan_drop_item(
2626            scx,
2627            ObjectType::View,
2628            if_exists,
2629            definition.name.clone(),
2630            cascade,
2631        )?;
2632
2633        // Check if the new View depends on the item that we would be replacing.
2634        if let Some(id) = maybe_item_to_drop {
2635            let dependencies = view.expr.depends_on();
2636            let invalid_drop = scx
2637                .get_item(&id)
2638                .global_ids()
2639                .any(|gid| dependencies.contains(&gid));
2640            if invalid_drop {
2641                let item = scx.catalog.get_item(&id);
2642                sql_bail!(
2643                    "cannot replace view {0}: depended upon by new {0} definition",
2644                    scx.catalog.resolve_full_name(item.name())
2645                );
2646            }
2647
2648            Some(id)
2649        } else {
2650            None
2651        }
2652    } else {
2653        None
2654    };
2655    let drop_ids = replace
2656        .map(|id| {
2657            scx.catalog
2658                .item_dependents(id)
2659                .into_iter()
2660                .map(|id| id.unwrap_item_id())
2661                .collect()
2662        })
2663        .unwrap_or_default();
2664
2665    validate_view_dependencies(scx, &view.dependencies.0)?;
2666
2667    // Check for an object in the catalog with this same name
2668    let full_name = scx.catalog.resolve_full_name(&name);
2669    let partial_name = PartialItemName::from(full_name.clone());
2670    // For PostgreSQL compatibility, we need to prevent creating views when
2671    // there is an existing object *or* type of the same name.
2672    if let (Ok(item), IfExistsBehavior::Error, false) = (
2673        scx.catalog.resolve_item_or_type(&partial_name),
2674        *if_exists,
2675        ignore_if_exists_errors,
2676    ) {
2677        return Err(PlanError::ItemAlreadyExists {
2678            name: full_name.to_string(),
2679            item_type: item.item_type(),
2680        });
2681    }
2682
2683    Ok(Plan::CreateView(CreateViewPlan {
2684        name,
2685        view,
2686        replace,
2687        drop_ids,
2688        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2689        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2690    }))
2691}
2692
2693/// Validate the dependencies of a (materialized) view.
2694fn validate_view_dependencies(
2695    scx: &StatementContext,
2696    dependencies: &BTreeSet<CatalogItemId>,
2697) -> Result<(), PlanError> {
2698    for id in dependencies {
2699        let item = scx.catalog.get_item(id);
2700        if item.replacement_target().is_some() {
2701            let name = scx.catalog.minimal_qualification(item.name());
2702            return Err(PlanError::InvalidDependency {
2703                name: name.to_string(),
2704                item_type: format!("replacement {}", item.item_type()),
2705            });
2706        }
2707    }
2708
2709    Ok(())
2710}
2711
2712pub fn describe_create_materialized_view(
2713    _: &StatementContext,
2714    _: CreateMaterializedViewStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716    Ok(StatementDesc::new(None))
2717}
2718
2719pub fn describe_create_continual_task(
2720    _: &StatementContext,
2721    _: CreateContinualTaskStatement<Aug>,
2722) -> Result<StatementDesc, PlanError> {
2723    Ok(StatementDesc::new(None))
2724}
2725
2726pub fn describe_create_network_policy(
2727    _: &StatementContext,
2728    _: CreateNetworkPolicyStatement<Aug>,
2729) -> Result<StatementDesc, PlanError> {
2730    Ok(StatementDesc::new(None))
2731}
2732
2733pub fn describe_alter_network_policy(
2734    _: &StatementContext,
2735    _: AlterNetworkPolicyStatement<Aug>,
2736) -> Result<StatementDesc, PlanError> {
2737    Ok(StatementDesc::new(None))
2738}
2739
2740pub fn plan_create_materialized_view(
2741    scx: &StatementContext,
2742    mut stmt: CreateMaterializedViewStatement<Aug>,
2743) -> Result<Plan, PlanError> {
2744    let cluster_id =
2745        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2746    stmt.in_cluster = Some(ResolvedClusterName {
2747        id: cluster_id,
2748        print_name: None,
2749    });
2750
2751    let create_sql =
2752        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2753
2754    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2755    let name = scx.allocate_qualified_name(partial_name.clone())?;
2756
2757    let query::PlannedRootQuery {
2758        expr,
2759        mut desc,
2760        finishing,
2761        scope: _,
2762    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2763    // We get back a trivial finishing, see comment in `plan_view`.
2764    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2765        &finishing,
2766        expr.arity()
2767    ));
2768    if expr.contains_parameters()? {
2769        return Err(PlanError::ParameterNotAllowed(
2770            "materialized views".to_string(),
2771        ));
2772    }
2773
2774    plan_utils::maybe_rename_columns(
2775        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2776        &mut desc,
2777        &stmt.columns,
2778    )?;
2779    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2780
2781    let MaterializedViewOptionExtracted {
2782        assert_not_null,
2783        partition_by,
2784        retain_history,
2785        refresh,
2786        seen: _,
2787    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2788
2789    if let Some(partition_by) = partition_by {
2790        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2791        check_partition_by(&desc, partition_by)?;
2792    }
2793
2794    let refresh_schedule = {
2795        let mut refresh_schedule = RefreshSchedule::default();
2796        let mut on_commits_seen = 0;
2797        for refresh_option_value in refresh {
2798            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2799                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2800            }
2801            match refresh_option_value {
2802                RefreshOptionValue::OnCommit => {
2803                    on_commits_seen += 1;
2804                }
2805                RefreshOptionValue::AtCreation => {
2806                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2807                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2808                }
2809                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2810                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2811                    let ecx = &ExprContext {
2812                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2813                        name: "REFRESH AT",
2814                        scope: &Scope::empty(),
2815                        relation_type: &SqlRelationType::empty(),
2816                        allow_aggregates: false,
2817                        allow_subqueries: false,
2818                        allow_parameters: false,
2819                        allow_windows: false,
2820                    };
2821                    let hir = plan_expr(ecx, &time)?.cast_to(
2822                        ecx,
2823                        CastContext::Assignment,
2824                        &SqlScalarType::MzTimestamp,
2825                    )?;
2826                    // (mz_now was purified away to a literal earlier)
2827                    let timestamp = hir
2828                        .into_literal_mz_timestamp()
2829                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2830                    refresh_schedule.ats.push(timestamp);
2831                }
2832                RefreshOptionValue::Every(RefreshEveryOptionValue {
2833                    interval,
2834                    aligned_to,
2835                }) => {
2836                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2837                    if interval.as_microseconds() <= 0 {
2838                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2839                    }
2840                    if interval.months != 0 {
2841                        // This limitation is because we want Intervals to be cleanly convertable
2842                        // to a unix epoch timestamp difference. When the interval involves months, then
2843                        // this is not true anymore, because months have variable lengths.
2844                        // See `Timestamp::round_up`.
2845                        sql_bail!("REFRESH interval must not involve units larger than days");
2846                    }
2847                    let interval = interval.duration()?;
2848                    if u64::try_from(interval.as_millis()).is_err() {
2849                        sql_bail!("REFRESH interval too large");
2850                    }
2851
2852                    let mut aligned_to = match aligned_to {
2853                        Some(aligned_to) => aligned_to,
2854                        None => {
2855                            soft_panic_or_log!(
2856                                "ALIGNED TO should have been filled in by purification"
2857                            );
2858                            sql_bail!(
2859                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2860                            )
2861                        }
2862                    };
2863
2864                    // Desugar the `aligned_to` expression
2865                    transform_ast::transform(scx, &mut aligned_to)?;
2866
2867                    let ecx = &ExprContext {
2868                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2869                        name: "REFRESH EVERY ... ALIGNED TO",
2870                        scope: &Scope::empty(),
2871                        relation_type: &SqlRelationType::empty(),
2872                        allow_aggregates: false,
2873                        allow_subqueries: false,
2874                        allow_parameters: false,
2875                        allow_windows: false,
2876                    };
2877                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2878                        ecx,
2879                        CastContext::Assignment,
2880                        &SqlScalarType::MzTimestamp,
2881                    )?;
2882                    // (mz_now was purified away to a literal earlier)
2883                    let aligned_to_const = aligned_to_hir
2884                        .into_literal_mz_timestamp()
2885                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2886
2887                    refresh_schedule.everies.push(RefreshEvery {
2888                        interval,
2889                        aligned_to: aligned_to_const,
2890                    });
2891                }
2892            }
2893        }
2894
2895        if on_commits_seen > 1 {
2896            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2897        }
2898        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2899            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2900        }
2901
2902        if refresh_schedule == RefreshSchedule::default() {
2903            None
2904        } else {
2905            Some(refresh_schedule)
2906        }
2907    };
2908
2909    let as_of = stmt.as_of.map(Timestamp::from);
2910    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2911    let mut non_null_assertions = assert_not_null
2912        .into_iter()
2913        .map(normalize::column_name)
2914        .map(|assertion_name| {
2915            column_names
2916                .iter()
2917                .position(|col| col == &assertion_name)
2918                .ok_or_else(|| {
2919                    sql_err!(
2920                        "column {} in ASSERT NOT NULL option not found",
2921                        assertion_name.quoted()
2922                    )
2923                })
2924        })
2925        .collect::<Result<Vec<_>, _>>()?;
2926    non_null_assertions.sort();
2927    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2928        let dup = &column_names[*dup];
2929        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2930    }
2931
2932    if let Some(dup) = column_names.iter().duplicates().next() {
2933        sql_bail!("column {} specified more than once", dup.quoted());
2934    }
2935
2936    // Override the statement-level IfExistsBehavior with Skip if this is
2937    // explicitly requested in the PlanContext (the default is `false`).
2938    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2939        Ok(true) => IfExistsBehavior::Skip,
2940        _ => stmt.if_exists,
2941    };
2942
2943    let mut replace = None;
2944    let mut if_not_exists = false;
2945    match if_exists {
2946        IfExistsBehavior::Replace => {
2947            let if_exists = true;
2948            let cascade = false;
2949            let replace_id = plan_drop_item(
2950                scx,
2951                ObjectType::MaterializedView,
2952                if_exists,
2953                partial_name.clone().into(),
2954                cascade,
2955            )?;
2956
2957            // Check if the new Materialized View depends on the item that we would be replacing.
2958            if let Some(id) = replace_id {
2959                let dependencies = expr.depends_on();
2960                let invalid_drop = scx
2961                    .get_item(&id)
2962                    .global_ids()
2963                    .any(|gid| dependencies.contains(&gid));
2964                if invalid_drop {
2965                    let item = scx.catalog.get_item(&id);
2966                    sql_bail!(
2967                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2968                        scx.catalog.resolve_full_name(item.name())
2969                    );
2970                }
2971                replace = Some(id);
2972            }
2973        }
2974        IfExistsBehavior::Skip => if_not_exists = true,
2975        IfExistsBehavior::Error => (),
2976    }
2977    let drop_ids = replace
2978        .map(|id| {
2979            scx.catalog
2980                .item_dependents(id)
2981                .into_iter()
2982                .map(|id| id.unwrap_item_id())
2983                .collect()
2984        })
2985        .unwrap_or_default();
2986    let mut dependencies: BTreeSet<_> = expr
2987        .depends_on()
2988        .into_iter()
2989        .map(|gid| scx.catalog.resolve_item_id(&gid))
2990        .collect();
2991
2992    // Validate the replacement target, if one is given.
2993    let mut replacement_target = None;
2994    if let Some(target_name) = &stmt.replacement_for {
2995        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2996
2997        let target = scx.get_item_by_resolved_name(target_name)?;
2998        if target.item_type() != CatalogItemType::MaterializedView {
2999            return Err(PlanError::InvalidReplacement {
3000                item_type: target.item_type(),
3001                item_name: scx.catalog.minimal_qualification(target.name()),
3002                replacement_type: CatalogItemType::MaterializedView,
3003                replacement_name: partial_name,
3004            });
3005        }
3006        if target.id().is_system() {
3007            sql_bail!(
3008                "cannot replace {} because it is required by the database system",
3009                scx.catalog.minimal_qualification(target.name()),
3010            );
3011        }
3012
3013        if !dependencies.insert(target.id()) {
3014            sql_bail!(
3015                "cannot replace {} because it is also a dependency",
3016                scx.catalog.minimal_qualification(target.name()),
3017            );
3018        }
3019
3020        for use_id in target.used_by() {
3021            let use_item = scx.get_item(use_id);
3022            if use_item.replacement_target() == Some(target.id()) {
3023                sql_bail!(
3024                    "cannot replace {} because it already has a replacement: {}",
3025                    scx.catalog.minimal_qualification(target.name()),
3026                    scx.catalog.minimal_qualification(use_item.name()),
3027                );
3028            }
3029        }
3030
3031        replacement_target = Some(target.id());
3032    }
3033
3034    validate_view_dependencies(scx, &dependencies)?;
3035
3036    // Check for an object in the catalog with this same name
3037    let full_name = scx.catalog.resolve_full_name(&name);
3038    let partial_name = PartialItemName::from(full_name.clone());
3039    // For PostgreSQL compatibility, we need to prevent creating materialized
3040    // views when there is an existing object *or* type of the same name.
3041    if let (IfExistsBehavior::Error, Ok(item)) =
3042        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3043    {
3044        return Err(PlanError::ItemAlreadyExists {
3045            name: full_name.to_string(),
3046            item_type: item.item_type(),
3047        });
3048    }
3049
3050    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3051        name,
3052        materialized_view: MaterializedView {
3053            create_sql,
3054            expr,
3055            dependencies: DependencyIds(dependencies),
3056            column_names,
3057            replacement_target,
3058            cluster_id,
3059            non_null_assertions,
3060            compaction_window,
3061            refresh_schedule,
3062            as_of,
3063        },
3064        replace,
3065        drop_ids,
3066        if_not_exists,
3067        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3068    }))
3069}
3070
3071generate_extracted_config!(
3072    MaterializedViewOption,
3073    (AssertNotNull, Ident, AllowMultiple),
3074    (PartitionBy, Vec<Ident>),
3075    (RetainHistory, OptionalDuration),
3076    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3077);
3078
3079pub fn plan_create_continual_task(
3080    scx: &StatementContext,
3081    mut stmt: CreateContinualTaskStatement<Aug>,
3082) -> Result<Plan, PlanError> {
3083    match &stmt.sugar {
3084        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3085        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3086            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3087        }
3088        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3089            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3090        }
3091    };
3092    let cluster_id = match &stmt.in_cluster {
3093        None => scx.catalog.resolve_cluster(None)?.id(),
3094        Some(in_cluster) => in_cluster.id,
3095    };
3096    stmt.in_cluster = Some(ResolvedClusterName {
3097        id: cluster_id,
3098        print_name: None,
3099    });
3100
3101    let create_sql =
3102        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3103
3104    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3105
3106    // It seems desirable for a CT that e.g. simply filters the input to keep
3107    // the same nullability. So, start by assuming all columns are non-nullable,
3108    // and then make them nullable below if any of the exprs plan them as
3109    // nullable.
3110    let mut desc = match stmt.columns {
3111        None => None,
3112        Some(columns) => {
3113            let mut desc_columns = Vec::with_capacity(columns.capacity());
3114            for col in columns.iter() {
3115                desc_columns.push((
3116                    normalize::column_name(col.name.clone()),
3117                    SqlColumnType {
3118                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3119                        nullable: false,
3120                    },
3121                ));
3122            }
3123            Some(RelationDesc::from_names_and_types(desc_columns))
3124        }
3125    };
3126    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3127    match input.item_type() {
3128        // Input must be a thing directly backed by a persist shard, so we can
3129        // use a persist listen to efficiently rehydrate.
3130        CatalogItemType::ContinualTask
3131        | CatalogItemType::Table
3132        | CatalogItemType::MaterializedView
3133        | CatalogItemType::Source => {}
3134        CatalogItemType::Sink
3135        | CatalogItemType::View
3136        | CatalogItemType::Index
3137        | CatalogItemType::Type
3138        | CatalogItemType::Func
3139        | CatalogItemType::Secret
3140        | CatalogItemType::Connection => {
3141            sql_bail!(
3142                "CONTINUAL TASK cannot use {} as an input",
3143                input.item_type()
3144            );
3145        }
3146    }
3147
3148    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3149    let ct_name = stmt.name;
3150    let placeholder_id = match &ct_name {
3151        ResolvedItemName::ContinualTask { id, name } => {
3152            let desc = match desc.as_ref().cloned() {
3153                Some(x) => x,
3154                None => {
3155                    // The user didn't specify the CT's columns. Take a wild
3156                    // guess that the CT has the same shape as the input. It's
3157                    // fine if this is wrong, we'll get an error below after
3158                    // planning the query.
3159                    let desc = input.relation_desc().expect("item type checked above");
3160                    desc.into_owned()
3161                }
3162            };
3163            qcx.ctes.insert(
3164                *id,
3165                CteDesc {
3166                    name: name.item.clone(),
3167                    desc,
3168                },
3169            );
3170            Some(*id)
3171        }
3172        _ => None,
3173    };
3174
3175    let mut exprs = Vec::new();
3176    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3177        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3178        let query::PlannedRootQuery {
3179            expr,
3180            desc: desc_query,
3181            finishing,
3182            scope: _,
3183        } = query::plan_ct_query(&mut qcx, query)?;
3184        // We get back a trivial finishing because we plan with a "maintained"
3185        // QueryLifetime, see comment in `plan_view`.
3186        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3187            &finishing,
3188            expr.arity()
3189        ));
3190        if expr.contains_parameters()? {
3191            if expr.contains_parameters()? {
3192                return Err(PlanError::ParameterNotAllowed(
3193                    "continual tasks".to_string(),
3194                ));
3195            }
3196        }
3197        let expr = match desc.as_mut() {
3198            None => {
3199                desc = Some(desc_query);
3200                expr
3201            }
3202            Some(desc) => {
3203                // We specify the columns for DELETE, so if any columns types don't
3204                // match, it's because it's an INSERT.
3205                if desc_query.arity() > desc.arity() {
3206                    sql_bail!(
3207                        "statement {}: INSERT has more expressions than target columns",
3208                        idx
3209                    );
3210                }
3211                if desc_query.arity() < desc.arity() {
3212                    sql_bail!(
3213                        "statement {}: INSERT has more target columns than expressions",
3214                        idx
3215                    );
3216                }
3217                // Ensure the types of the source query match the types of the target table,
3218                // installing assignment casts where necessary and possible.
3219                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3220                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3221                let expr = expr.map_err(|e| {
3222                    sql_err!(
3223                        "statement {}: column {} is of type {} but expression is of type {}",
3224                        idx,
3225                        desc.get_name(e.column).quoted(),
3226                        qcx.humanize_scalar_type(&e.target_type, false),
3227                        qcx.humanize_scalar_type(&e.source_type, false),
3228                    )
3229                })?;
3230
3231                // Update ct nullability as necessary. The `ne` above verified that the
3232                // types are the same len.
3233                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3234                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3235                if updated {
3236                    let new_types = zip_types().map(|(ct, q)| {
3237                        let mut ct = ct.clone();
3238                        if q.nullable {
3239                            ct.nullable = true;
3240                        }
3241                        ct
3242                    });
3243                    *desc = RelationDesc::from_names_and_types(
3244                        desc.iter_names().cloned().zip_eq(new_types),
3245                    );
3246                }
3247
3248                expr
3249            }
3250        };
3251        match stmt {
3252            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3253            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3254        }
3255    }
3256    // TODO(ct3): Collect things by output and assert that there is only one (or
3257    // support multiple outputs).
3258    let expr = exprs
3259        .into_iter()
3260        .reduce(|acc, expr| acc.union(expr))
3261        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3262    let dependencies = expr
3263        .depends_on()
3264        .into_iter()
3265        .map(|gid| scx.catalog.resolve_item_id(&gid))
3266        .collect();
3267
3268    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3269    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3270    if let Some(dup) = column_names.iter().duplicates().next() {
3271        sql_bail!("column {} specified more than once", dup.quoted());
3272    }
3273
3274    // Check for an object in the catalog with this same name
3275    let name = match &ct_name {
3276        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3277        ResolvedItemName::ContinualTask { name, .. } => {
3278            let name = scx.allocate_qualified_name(name.clone())?;
3279            let full_name = scx.catalog.resolve_full_name(&name);
3280            let partial_name = PartialItemName::from(full_name.clone());
3281            // For PostgreSQL compatibility, we need to prevent creating this when there
3282            // is an existing object *or* type of the same name.
3283            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3284                return Err(PlanError::ItemAlreadyExists {
3285                    name: full_name.to_string(),
3286                    item_type: item.item_type(),
3287                });
3288            }
3289            name
3290        }
3291        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3292        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3293    };
3294
3295    let as_of = stmt.as_of.map(Timestamp::from);
3296    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3297        name,
3298        placeholder_id,
3299        desc,
3300        input_id: input.global_id(),
3301        with_snapshot: snapshot.unwrap_or(true),
3302        continual_task: MaterializedView {
3303            create_sql,
3304            expr,
3305            dependencies,
3306            column_names,
3307            replacement_target: None,
3308            cluster_id,
3309            non_null_assertions: Vec::new(),
3310            compaction_window: None,
3311            refresh_schedule: None,
3312            as_of,
3313        },
3314    }))
3315}
3316
3317fn continual_task_query<'a>(
3318    ct_name: &ResolvedItemName,
3319    stmt: &'a ast::ContinualTaskStmt<Aug>,
3320) -> Option<ast::Query<Aug>> {
3321    match stmt {
3322        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3323            table_name: _,
3324            columns,
3325            source,
3326            returning,
3327        }) => {
3328            if !columns.is_empty() || !returning.is_empty() {
3329                return None;
3330            }
3331            match source {
3332                ast::InsertSource::Query(query) => Some(query.clone()),
3333                ast::InsertSource::DefaultValues => None,
3334            }
3335        }
3336        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3337            table_name: _,
3338            alias,
3339            using,
3340            selection,
3341        }) => {
3342            if !using.is_empty() {
3343                return None;
3344            }
3345            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3346            let from = ast::TableWithJoins {
3347                relation: ast::TableFactor::Table {
3348                    name: ct_name.clone(),
3349                    alias: alias.clone(),
3350                },
3351                joins: Vec::new(),
3352            };
3353            let select = ast::Select {
3354                from: vec![from],
3355                selection: selection.clone(),
3356                distinct: None,
3357                projection: vec![ast::SelectItem::Wildcard],
3358                group_by: Vec::new(),
3359                having: None,
3360                qualify: None,
3361                options: Vec::new(),
3362            };
3363            let query = ast::Query {
3364                ctes: ast::CteBlock::Simple(Vec::new()),
3365                body: ast::SetExpr::Select(Box::new(select)),
3366                order_by: Vec::new(),
3367                limit: None,
3368                offset: None,
3369            };
3370            // Then negate it to turn it into retractions (after planning it).
3371            Some(query)
3372        }
3373    }
3374}
3375
3376generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3377
3378pub fn describe_create_sink(
3379    _: &StatementContext,
3380    _: CreateSinkStatement<Aug>,
3381) -> Result<StatementDesc, PlanError> {
3382    Ok(StatementDesc::new(None))
3383}
3384
3385generate_extracted_config!(
3386    CreateSinkOption,
3387    (Snapshot, bool),
3388    (PartitionStrategy, String),
3389    (Version, u64),
3390    (CommitInterval, Duration)
3391);
3392
3393pub fn plan_create_sink(
3394    scx: &StatementContext,
3395    stmt: CreateSinkStatement<Aug>,
3396) -> Result<Plan, PlanError> {
3397    // Check for an object in the catalog with this same name
3398    let Some(name) = stmt.name.clone() else {
3399        return Err(PlanError::MissingName(CatalogItemType::Sink));
3400    };
3401    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3402    let full_name = scx.catalog.resolve_full_name(&name);
3403    let partial_name = PartialItemName::from(full_name.clone());
3404    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3405        return Err(PlanError::ItemAlreadyExists {
3406            name: full_name.to_string(),
3407            item_type: item.item_type(),
3408        });
3409    }
3410
3411    plan_sink(scx, stmt)
3412}
3413
3414/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3415/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3416/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3417/// important.
3418fn plan_sink(
3419    scx: &StatementContext,
3420    mut stmt: CreateSinkStatement<Aug>,
3421) -> Result<Plan, PlanError> {
3422    let CreateSinkStatement {
3423        name,
3424        in_cluster: _,
3425        from,
3426        connection,
3427        format,
3428        envelope,
3429        if_not_exists,
3430        with_options,
3431    } = stmt.clone();
3432
3433    let Some(name) = name else {
3434        return Err(PlanError::MissingName(CatalogItemType::Sink));
3435    };
3436    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3437
3438    let envelope = match envelope {
3439        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3440        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3441        None => sql_bail!("ENVELOPE clause is required"),
3442    };
3443
3444    let from_name = &from;
3445    let from = scx.get_item_by_resolved_name(&from)?;
3446
3447    {
3448        use CatalogItemType::*;
3449        match from.item_type() {
3450            Table | Source | MaterializedView | ContinualTask => {
3451                if from.replacement_target().is_some() {
3452                    let name = scx.catalog.minimal_qualification(from.name());
3453                    return Err(PlanError::InvalidSinkFrom {
3454                        name: name.to_string(),
3455                        item_type: format!("replacement {}", from.item_type()),
3456                    });
3457                }
3458            }
3459            Sink | View | Index | Type | Func | Secret | Connection => {
3460                let name = scx.catalog.minimal_qualification(from.name());
3461                return Err(PlanError::InvalidSinkFrom {
3462                    name: name.to_string(),
3463                    item_type: from.item_type().to_string(),
3464                });
3465            }
3466        }
3467    }
3468
3469    if from.id().is_system() {
3470        bail_unsupported!("creating a sink directly on a catalog object");
3471    }
3472
3473    let desc = from.relation_desc().expect("item type checked above");
3474    let key_indices = match &connection {
3475        CreateSinkConnection::Kafka { key: Some(key), .. }
3476        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3477            let key_columns = key
3478                .key_columns
3479                .clone()
3480                .into_iter()
3481                .map(normalize::column_name)
3482                .collect::<Vec<_>>();
3483            let mut uniq = BTreeSet::new();
3484            for col in key_columns.iter() {
3485                if !uniq.insert(col) {
3486                    sql_bail!("duplicate column referenced in KEY: {}", col);
3487                }
3488            }
3489            let indices = key_columns
3490                .iter()
3491                .map(|col| -> anyhow::Result<usize> {
3492                    let name_idx =
3493                        desc.get_by_name(col)
3494                            .map(|(idx, _type)| idx)
3495                            .ok_or_else(|| {
3496                                sql_err!("column referenced in KEY does not exist: {}", col)
3497                            })?;
3498                    if desc.get_unambiguous_name(name_idx).is_none() {
3499                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3500                    }
3501                    Ok(name_idx)
3502                })
3503                .collect::<Result<Vec<_>, _>>()?;
3504            let is_valid_key = desc
3505                .typ()
3506                .keys
3507                .iter()
3508                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3509
3510            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3511                if key.not_enforced {
3512                    scx.catalog
3513                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3514                            key: key_columns.clone(),
3515                            name: name.item.clone(),
3516                        })
3517                } else {
3518                    return Err(PlanError::UpsertSinkWithInvalidKey {
3519                        name: from_name.full_name_str(),
3520                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3521                        valid_keys: desc
3522                            .typ()
3523                            .keys
3524                            .iter()
3525                            .map(|key| {
3526                                key.iter()
3527                                    .map(|col| desc.get_name(*col).as_str().into())
3528                                    .collect()
3529                            })
3530                            .collect(),
3531                    });
3532                }
3533            }
3534            Some(indices)
3535        }
3536        CreateSinkConnection::Kafka { key: None, .. }
3537        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3538    };
3539
3540    let headers_index = match &connection {
3541        CreateSinkConnection::Kafka {
3542            headers: Some(headers),
3543            ..
3544        } => {
3545            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3546
3547            match envelope {
3548                SinkEnvelope::Upsert => (),
3549                SinkEnvelope::Debezium => {
3550                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3551                }
3552            };
3553
3554            let headers = normalize::column_name(headers.clone());
3555            let (idx, ty) = desc
3556                .get_by_name(&headers)
3557                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3558
3559            if desc.get_unambiguous_name(idx).is_none() {
3560                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3561            }
3562
3563            match &ty.scalar_type {
3564                SqlScalarType::Map { value_type, .. }
3565                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3566                _ => sql_bail!(
3567                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3568                ),
3569            }
3570
3571            Some(idx)
3572        }
3573        _ => None,
3574    };
3575
3576    // pick the first valid natural relation key, if any
3577    let relation_key_indices = desc.typ().keys.get(0).cloned();
3578
3579    let key_desc_and_indices = key_indices.map(|key_indices| {
3580        let cols = desc
3581            .iter()
3582            .map(|(name, ty)| (name.clone(), ty.clone()))
3583            .collect::<Vec<_>>();
3584        let (names, types): (Vec<_>, Vec<_>) =
3585            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3586        let typ = SqlRelationType::new(types);
3587        (RelationDesc::new(typ, names), key_indices)
3588    });
3589
3590    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3591        return Err(PlanError::UpsertSinkWithoutKey);
3592    }
3593
3594    let CreateSinkOptionExtracted {
3595        snapshot,
3596        version,
3597        partition_strategy: _,
3598        seen: _,
3599        commit_interval,
3600    } = with_options.try_into()?;
3601
3602    let connection_builder = match connection {
3603        CreateSinkConnection::Kafka {
3604            connection,
3605            options,
3606            ..
3607        } => kafka_sink_builder(
3608            scx,
3609            connection,
3610            options,
3611            format,
3612            relation_key_indices,
3613            key_desc_and_indices,
3614            headers_index,
3615            desc.into_owned(),
3616            envelope,
3617            from.id(),
3618            commit_interval,
3619        )?,
3620        CreateSinkConnection::Iceberg {
3621            connection,
3622            aws_connection,
3623            options,
3624            ..
3625        } => iceberg_sink_builder(
3626            scx,
3627            connection,
3628            aws_connection,
3629            options,
3630            relation_key_indices,
3631            key_desc_and_indices,
3632            commit_interval,
3633        )?,
3634    };
3635
3636    // WITH SNAPSHOT defaults to true
3637    let with_snapshot = snapshot.unwrap_or(true);
3638    // VERSION defaults to 0
3639    let version = version.unwrap_or(0);
3640
3641    // We will rewrite the cluster if one is not provided, so we must use the
3642    // `in_cluster` value we plan to normalize when we canonicalize the create
3643    // statement.
3644    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3645    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3646
3647    Ok(Plan::CreateSink(CreateSinkPlan {
3648        name,
3649        sink: Sink {
3650            create_sql,
3651            from: from.global_id(),
3652            connection: connection_builder,
3653            envelope,
3654            version,
3655            commit_interval,
3656        },
3657        with_snapshot,
3658        if_not_exists,
3659        in_cluster: in_cluster.id(),
3660    }))
3661}
3662
3663fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3664    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3665
3666    let existing_keys = desc
3667        .typ()
3668        .keys
3669        .iter()
3670        .map(|key_columns| {
3671            key_columns
3672                .iter()
3673                .map(|col| desc.get_name(*col).as_str())
3674                .join(", ")
3675        })
3676        .join(", ");
3677
3678    sql_err!(
3679        "Key constraint ({}) conflicts with existing key ({})",
3680        user_keys,
3681        existing_keys
3682    )
3683}
3684
3685/// Creating this by hand instead of using generate_extracted_config! macro
3686/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3687#[derive(Debug, Default, PartialEq, Clone)]
3688pub struct CsrConfigOptionExtracted {
3689    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3690    pub(crate) avro_key_fullname: Option<String>,
3691    pub(crate) avro_value_fullname: Option<String>,
3692    pub(crate) null_defaults: bool,
3693    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3694    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3695    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3696    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3697}
3698
3699impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3700    type Error = crate::plan::PlanError;
3701    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3702        let mut extracted = CsrConfigOptionExtracted::default();
3703        let mut common_doc_comments = BTreeMap::new();
3704        for option in v {
3705            if !extracted.seen.insert(option.name.clone()) {
3706                return Err(PlanError::Unstructured({
3707                    format!("{} specified more than once", option.name)
3708                }));
3709            }
3710            let option_name = option.name.clone();
3711            let option_name_str = option_name.to_ast_string_simple();
3712            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3713                option_name: option_name.to_ast_string_simple(),
3714                err: e.into(),
3715            };
3716            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3717                val.map(|s| match s {
3718                    WithOptionValue::Value(Value::String(s)) => {
3719                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3720                    }
3721                    _ => Err("must be a string".to_string()),
3722                })
3723                .transpose()
3724                .map_err(PlanError::Unstructured)
3725                .map_err(better_error)
3726            };
3727            match option.name {
3728                CsrConfigOptionName::AvroKeyFullname => {
3729                    extracted.avro_key_fullname =
3730                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3731                }
3732                CsrConfigOptionName::AvroValueFullname => {
3733                    extracted.avro_value_fullname =
3734                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3735                }
3736                CsrConfigOptionName::NullDefaults => {
3737                    extracted.null_defaults =
3738                        <bool>::try_from_value(option.value).map_err(better_error)?;
3739                }
3740                CsrConfigOptionName::AvroDocOn(doc_on) => {
3741                    let value = String::try_from_value(option.value.ok_or_else(|| {
3742                        PlanError::InvalidOptionValue {
3743                            option_name: option_name_str,
3744                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3745                        }
3746                    })?)
3747                    .map_err(better_error)?;
3748                    let key = match doc_on.identifier {
3749                        DocOnIdentifier::Column(ast::ColumnName {
3750                            relation: ResolvedItemName::Item { id, .. },
3751                            column: ResolvedColumnReference::Column { name, index: _ },
3752                        }) => DocTarget::Field {
3753                            object_id: id,
3754                            column_name: name,
3755                        },
3756                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3757                            DocTarget::Type(id)
3758                        }
3759                        _ => unreachable!(),
3760                    };
3761
3762                    match doc_on.for_schema {
3763                        DocOnSchema::KeyOnly => {
3764                            extracted.key_doc_options.insert(key, value);
3765                        }
3766                        DocOnSchema::ValueOnly => {
3767                            extracted.value_doc_options.insert(key, value);
3768                        }
3769                        DocOnSchema::All => {
3770                            common_doc_comments.insert(key, value);
3771                        }
3772                    }
3773                }
3774                CsrConfigOptionName::KeyCompatibilityLevel => {
3775                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3776                }
3777                CsrConfigOptionName::ValueCompatibilityLevel => {
3778                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3779                }
3780            }
3781        }
3782
3783        for (key, value) in common_doc_comments {
3784            if !extracted.key_doc_options.contains_key(&key) {
3785                extracted.key_doc_options.insert(key.clone(), value.clone());
3786            }
3787            if !extracted.value_doc_options.contains_key(&key) {
3788                extracted.value_doc_options.insert(key, value);
3789            }
3790        }
3791        Ok(extracted)
3792    }
3793}
3794
3795fn iceberg_sink_builder(
3796    scx: &StatementContext,
3797    catalog_connection: ResolvedItemName,
3798    aws_connection: ResolvedItemName,
3799    options: Vec<IcebergSinkConfigOption<Aug>>,
3800    relation_key_indices: Option<Vec<usize>>,
3801    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3802    commit_interval: Option<Duration>,
3803) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3804    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3805    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3806    let catalog_connection_id = catalog_connection_item.id();
3807    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3808    let aws_connection_id = aws_connection_item.id();
3809    if !matches!(
3810        catalog_connection_item.connection()?,
3811        Connection::IcebergCatalog(_)
3812    ) {
3813        sql_bail!(
3814            "{} is not an iceberg catalog connection",
3815            scx.catalog
3816                .resolve_full_name(catalog_connection_item.name())
3817                .to_string()
3818                .quoted()
3819        );
3820    };
3821
3822    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3823        sql_bail!(
3824            "{} is not an AWS connection",
3825            scx.catalog
3826                .resolve_full_name(aws_connection_item.name())
3827                .to_string()
3828                .quoted()
3829        );
3830    }
3831
3832    let IcebergSinkConfigOptionExtracted {
3833        table,
3834        namespace,
3835        seen: _,
3836    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3837
3838    let Some(table) = table else {
3839        sql_bail!("Iceberg sink must specify TABLE");
3840    };
3841    let Some(namespace) = namespace else {
3842        sql_bail!("Iceberg sink must specify NAMESPACE");
3843    };
3844    if commit_interval.is_none() {
3845        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3846    }
3847
3848    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3849        catalog_connection_id,
3850        catalog_connection: catalog_connection_id,
3851        aws_connection_id,
3852        aws_connection: aws_connection_id,
3853        table,
3854        namespace,
3855        relation_key_indices,
3856        key_desc_and_indices,
3857    }))
3858}
3859
3860fn kafka_sink_builder(
3861    scx: &StatementContext,
3862    connection: ResolvedItemName,
3863    options: Vec<KafkaSinkConfigOption<Aug>>,
3864    format: Option<FormatSpecifier<Aug>>,
3865    relation_key_indices: Option<Vec<usize>>,
3866    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3867    headers_index: Option<usize>,
3868    value_desc: RelationDesc,
3869    envelope: SinkEnvelope,
3870    sink_from: CatalogItemId,
3871    commit_interval: Option<Duration>,
3872) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3873    // Get Kafka connection.
3874    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3875    let connection_id = connection_item.id();
3876    match connection_item.connection()? {
3877        Connection::Kafka(_) => (),
3878        _ => sql_bail!(
3879            "{} is not a kafka connection",
3880            scx.catalog.resolve_full_name(connection_item.name())
3881        ),
3882    };
3883
3884    if commit_interval.is_some() {
3885        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3886    }
3887
3888    let KafkaSinkConfigOptionExtracted {
3889        topic,
3890        compression_type,
3891        partition_by,
3892        progress_group_id_prefix,
3893        transactional_id_prefix,
3894        legacy_ids,
3895        topic_config,
3896        topic_metadata_refresh_interval,
3897        topic_partition_count,
3898        topic_replication_factor,
3899        seen: _,
3900    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3901
3902    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3903        (Some(_), Some(true)) => {
3904            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3905        }
3906        (None, Some(true)) => KafkaIdStyle::Legacy,
3907        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3908    };
3909
3910    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3911        (Some(_), Some(true)) => {
3912            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3913        }
3914        (None, Some(true)) => KafkaIdStyle::Legacy,
3915        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3916    };
3917
3918    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3919
3920    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3921        // This is a librdkafka-enforced restriction that, if violated,
3922        // would result in a runtime error for the source.
3923        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3924    }
3925
3926    let assert_positive = |val: Option<i32>, name: &str| {
3927        if let Some(val) = val {
3928            if val <= 0 {
3929                sql_bail!("{} must be a positive integer", name);
3930            }
3931        }
3932        val.map(NonNeg::try_from)
3933            .transpose()
3934            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3935    };
3936    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3937    let topic_replication_factor =
3938        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3939
3940    // Helper method to parse avro connection options for format specifiers that use avro
3941    // for either key or value encoding.
3942    let gen_avro_schema_options = |conn| {
3943        let CsrConnectionAvro {
3944            connection:
3945                CsrConnection {
3946                    connection,
3947                    options,
3948                },
3949            seed,
3950            key_strategy,
3951            value_strategy,
3952        } = conn;
3953        if seed.is_some() {
3954            sql_bail!("SEED option does not make sense with sinks");
3955        }
3956        if key_strategy.is_some() {
3957            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3958        }
3959        if value_strategy.is_some() {
3960            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3961        }
3962
3963        let item = scx.get_item_by_resolved_name(&connection)?;
3964        let csr_connection = match item.connection()? {
3965            Connection::Csr(_) => item.id(),
3966            _ => {
3967                sql_bail!(
3968                    "{} is not a schema registry connection",
3969                    scx.catalog
3970                        .resolve_full_name(item.name())
3971                        .to_string()
3972                        .quoted()
3973                )
3974            }
3975        };
3976        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3977
3978        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3979            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3980        }
3981
3982        if key_desc_and_indices.is_some()
3983            && (extracted_options.avro_key_fullname.is_some()
3984                ^ extracted_options.avro_value_fullname.is_some())
3985        {
3986            sql_bail!(
3987                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3988            );
3989        }
3990
3991        Ok((csr_connection, extracted_options))
3992    };
3993
3994    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3995        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3996        Format::Bytes if desc.arity() == 1 => {
3997            let col_type = &desc.typ().column_types[0].scalar_type;
3998            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3999                bail_unsupported!(format!(
4000                    "BYTES format with non-encodable type: {:?}",
4001                    col_type
4002                ));
4003            }
4004
4005            Ok(KafkaSinkFormatType::Bytes)
4006        }
4007        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4008        Format::Bytes | Format::Text => {
4009            bail_unsupported!("BYTES or TEXT format with multiple columns")
4010        }
4011        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4012        Format::Avro(AvroSchema::Csr { csr_connection }) => {
4013            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4014            let schema = if is_key {
4015                AvroSchemaGenerator::new(
4016                    desc.clone(),
4017                    false,
4018                    options.key_doc_options,
4019                    options.avro_key_fullname.as_deref().unwrap_or("row"),
4020                    options.null_defaults,
4021                    Some(sink_from),
4022                    false,
4023                )?
4024                .schema()
4025                .to_string()
4026            } else {
4027                AvroSchemaGenerator::new(
4028                    desc.clone(),
4029                    matches!(envelope, SinkEnvelope::Debezium),
4030                    options.value_doc_options,
4031                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4032                    options.null_defaults,
4033                    Some(sink_from),
4034                    true,
4035                )?
4036                .schema()
4037                .to_string()
4038            };
4039            Ok(KafkaSinkFormatType::Avro {
4040                schema,
4041                compatibility_level: if is_key {
4042                    options.key_compatibility_level
4043                } else {
4044                    options.value_compatibility_level
4045                },
4046                csr_connection,
4047            })
4048        }
4049        format => bail_unsupported!(format!("sink format {:?}", format)),
4050    };
4051
4052    let partition_by = match &partition_by {
4053        Some(partition_by) => {
4054            let mut scope = Scope::from_source(None, value_desc.iter_names());
4055
4056            match envelope {
4057                SinkEnvelope::Upsert => (),
4058                SinkEnvelope::Debezium => {
4059                    let key_indices: HashSet<_> = key_desc_and_indices
4060                        .as_ref()
4061                        .map(|(_desc, indices)| indices.as_slice())
4062                        .unwrap_or_default()
4063                        .into_iter()
4064                        .collect();
4065                    for (i, item) in scope.items.iter_mut().enumerate() {
4066                        if !key_indices.contains(&i) {
4067                            item.error_if_referenced = Some(|_table, column| {
4068                                PlanError::InvalidPartitionByEnvelopeDebezium {
4069                                    column_name: column.to_string(),
4070                                }
4071                            });
4072                        }
4073                    }
4074                }
4075            };
4076
4077            let ecx = &ExprContext {
4078                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4079                name: "PARTITION BY",
4080                scope: &scope,
4081                relation_type: value_desc.typ(),
4082                allow_aggregates: false,
4083                allow_subqueries: false,
4084                allow_parameters: false,
4085                allow_windows: false,
4086            };
4087            let expr = plan_expr(ecx, partition_by)?.cast_to(
4088                ecx,
4089                CastContext::Assignment,
4090                &SqlScalarType::UInt64,
4091            )?;
4092            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4093
4094            Some(expr)
4095        }
4096        _ => None,
4097    };
4098
4099    // Map from the format specifier of the statement to the individual key/value formats for the sink.
4100    let format = match format {
4101        Some(FormatSpecifier::KeyValue { key, value }) => {
4102            let key_format = match key_desc_and_indices.as_ref() {
4103                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4104                None => None,
4105            };
4106            KafkaSinkFormat {
4107                value_format: map_format(value, &value_desc, false)?,
4108                key_format,
4109            }
4110        }
4111        Some(FormatSpecifier::Bare(format)) => {
4112            let key_format = match key_desc_and_indices.as_ref() {
4113                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4114                None => None,
4115            };
4116            KafkaSinkFormat {
4117                value_format: map_format(format, &value_desc, false)?,
4118                key_format,
4119            }
4120        }
4121        None => bail_unsupported!("sink without format"),
4122    };
4123
4124    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4125        connection_id,
4126        connection: connection_id,
4127        format,
4128        topic: topic_name,
4129        relation_key_indices,
4130        key_desc_and_indices,
4131        headers_index,
4132        value_desc,
4133        partition_by,
4134        compression_type,
4135        progress_group_id,
4136        transactional_id,
4137        topic_options: KafkaTopicOptions {
4138            partition_count: topic_partition_count,
4139            replication_factor: topic_replication_factor,
4140            topic_config: topic_config.unwrap_or_default(),
4141        },
4142        topic_metadata_refresh_interval,
4143    }))
4144}
4145
4146pub fn describe_create_index(
4147    _: &StatementContext,
4148    _: CreateIndexStatement<Aug>,
4149) -> Result<StatementDesc, PlanError> {
4150    Ok(StatementDesc::new(None))
4151}
4152
4153pub fn plan_create_index(
4154    scx: &StatementContext,
4155    mut stmt: CreateIndexStatement<Aug>,
4156) -> Result<Plan, PlanError> {
4157    let CreateIndexStatement {
4158        name,
4159        on_name,
4160        in_cluster,
4161        key_parts,
4162        with_options,
4163        if_not_exists,
4164    } = &mut stmt;
4165    let on = scx.get_item_by_resolved_name(on_name)?;
4166
4167    {
4168        use CatalogItemType::*;
4169        match on.item_type() {
4170            Table | Source | View | MaterializedView | ContinualTask => {
4171                if on.replacement_target().is_some() {
4172                    sql_bail!(
4173                        "index cannot be created on {} because it is a replacement {}",
4174                        on_name.full_name_str(),
4175                        on.item_type(),
4176                    );
4177                }
4178            }
4179            Sink | Index | Type | Func | Secret | Connection => {
4180                sql_bail!(
4181                    "index cannot be created on {} because it is a {}",
4182                    on_name.full_name_str(),
4183                    on.item_type(),
4184                );
4185            }
4186        }
4187    }
4188
4189    let on_desc = on.relation_desc().expect("item type checked above");
4190
4191    let filled_key_parts = match key_parts {
4192        Some(kp) => kp.to_vec(),
4193        None => {
4194            // `key_parts` is None if we're creating a "default" index.
4195            let key = on_desc.typ().default_key();
4196            key.iter()
4197                .map(|i| match on_desc.get_unambiguous_name(*i) {
4198                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4199                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4200                })
4201                .collect()
4202        }
4203    };
4204    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4205
4206    let index_name = if let Some(name) = name {
4207        QualifiedItemName {
4208            qualifiers: on.name().qualifiers.clone(),
4209            item: normalize::ident(name.clone()),
4210        }
4211    } else {
4212        let mut idx_name = QualifiedItemName {
4213            qualifiers: on.name().qualifiers.clone(),
4214            item: on.name().item.clone(),
4215        };
4216        if key_parts.is_none() {
4217            // We're trying to create the "default" index.
4218            idx_name.item += "_primary_idx";
4219        } else {
4220            // Use PG schema for automatically naming indexes:
4221            // `<table>_<_-separated indexed expressions>_idx`
4222            let index_name_col_suffix = keys
4223                .iter()
4224                .map(|k| match k {
4225                    mz_expr::MirScalarExpr::Column(i, name) => {
4226                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4227                            (Some(col_name), _) => col_name.to_string(),
4228                            (None, Some(name)) => name.to_string(),
4229                            (None, None) => format!("{}", i + 1),
4230                        }
4231                    }
4232                    _ => "expr".to_string(),
4233                })
4234                .join("_");
4235            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4236                .expect("write on strings cannot fail");
4237            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4238        }
4239
4240        if !*if_not_exists {
4241            scx.catalog.find_available_name(idx_name)
4242        } else {
4243            idx_name
4244        }
4245    };
4246
4247    // Check for an object in the catalog with this same name
4248    let full_name = scx.catalog.resolve_full_name(&index_name);
4249    let partial_name = PartialItemName::from(full_name.clone());
4250    // For PostgreSQL compatibility, we need to prevent creating indexes when
4251    // there is an existing object *or* type of the same name.
4252    //
4253    // Technically, we only need to prevent coexistence of indexes and types
4254    // that have an associated relation (record types but not list/map types).
4255    // Enforcing that would be more complicated, though. It's backwards
4256    // compatible to weaken this restriction in the future.
4257    if let (Ok(item), false, false) = (
4258        scx.catalog.resolve_item_or_type(&partial_name),
4259        *if_not_exists,
4260        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4261    ) {
4262        return Err(PlanError::ItemAlreadyExists {
4263            name: full_name.to_string(),
4264            item_type: item.item_type(),
4265        });
4266    }
4267
4268    let options = plan_index_options(scx, with_options.clone())?;
4269    let cluster_id = match in_cluster {
4270        None => scx.resolve_cluster(None)?.id(),
4271        Some(in_cluster) => in_cluster.id,
4272    };
4273
4274    *in_cluster = Some(ResolvedClusterName {
4275        id: cluster_id,
4276        print_name: None,
4277    });
4278
4279    // Normalize `stmt`.
4280    *name = Some(Ident::new(index_name.item.clone())?);
4281    *key_parts = Some(filled_key_parts);
4282    let if_not_exists = *if_not_exists;
4283
4284    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4285    let compaction_window = options.iter().find_map(|o| {
4286        #[allow(irrefutable_let_patterns)]
4287        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4288            Some(lcw.clone())
4289        } else {
4290            None
4291        }
4292    });
4293
4294    Ok(Plan::CreateIndex(CreateIndexPlan {
4295        name: index_name,
4296        index: Index {
4297            create_sql,
4298            on: on.global_id(),
4299            keys,
4300            cluster_id,
4301            compaction_window,
4302        },
4303        if_not_exists,
4304    }))
4305}
4306
4307pub fn describe_create_type(
4308    _: &StatementContext,
4309    _: CreateTypeStatement<Aug>,
4310) -> Result<StatementDesc, PlanError> {
4311    Ok(StatementDesc::new(None))
4312}
4313
4314pub fn plan_create_type(
4315    scx: &StatementContext,
4316    stmt: CreateTypeStatement<Aug>,
4317) -> Result<Plan, PlanError> {
4318    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4319    let CreateTypeStatement { name, as_type, .. } = stmt;
4320
4321    fn validate_data_type(
4322        scx: &StatementContext,
4323        data_type: ResolvedDataType,
4324        as_type: &str,
4325        key: &str,
4326    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4327        let (id, modifiers) = match data_type {
4328            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4329            _ => sql_bail!(
4330                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4331                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4332                as_type,
4333                key,
4334                data_type.human_readable_name(),
4335            ),
4336        };
4337
4338        let item = scx.catalog.get_item(&id);
4339        match item.type_details() {
4340            None => sql_bail!(
4341                "{} must be of class type, but received {} which is of class {}",
4342                key,
4343                scx.catalog.resolve_full_name(item.name()),
4344                item.item_type()
4345            ),
4346            Some(CatalogTypeDetails {
4347                typ: CatalogType::Char,
4348                ..
4349            }) => {
4350                bail_unsupported!("embedding char type in a list or map")
4351            }
4352            _ => {
4353                // Validate that the modifiers are actually valid.
4354                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4355
4356                Ok((id, modifiers))
4357            }
4358        }
4359    }
4360
4361    let inner = match as_type {
4362        CreateTypeAs::List { options } => {
4363            let CreateTypeListOptionExtracted {
4364                element_type,
4365                seen: _,
4366            } = CreateTypeListOptionExtracted::try_from(options)?;
4367            let element_type =
4368                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4369            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4370            CatalogType::List {
4371                element_reference: id,
4372                element_modifiers: modifiers,
4373            }
4374        }
4375        CreateTypeAs::Map { options } => {
4376            let CreateTypeMapOptionExtracted {
4377                key_type,
4378                value_type,
4379                seen: _,
4380            } = CreateTypeMapOptionExtracted::try_from(options)?;
4381            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4382            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4383            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4384            let (value_id, value_modifiers) =
4385                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4386            CatalogType::Map {
4387                key_reference: key_id,
4388                key_modifiers,
4389                value_reference: value_id,
4390                value_modifiers,
4391            }
4392        }
4393        CreateTypeAs::Record { column_defs } => {
4394            let mut fields = vec![];
4395            for column_def in column_defs {
4396                let data_type = column_def.data_type;
4397                let key = ident(column_def.name.clone());
4398                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4399                fields.push(CatalogRecordField {
4400                    name: ColumnName::from(key.clone()),
4401                    type_reference: id,
4402                    type_modifiers: modifiers,
4403                });
4404            }
4405            CatalogType::Record { fields }
4406        }
4407    };
4408
4409    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4410
4411    // Check for an object in the catalog with this same name
4412    let full_name = scx.catalog.resolve_full_name(&name);
4413    let partial_name = PartialItemName::from(full_name.clone());
4414    // For PostgreSQL compatibility, we need to prevent creating types when
4415    // there is an existing object *or* type of the same name.
4416    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4417        if item.item_type().conflicts_with_type() {
4418            return Err(PlanError::ItemAlreadyExists {
4419                name: full_name.to_string(),
4420                item_type: item.item_type(),
4421            });
4422        }
4423    }
4424
4425    Ok(Plan::CreateType(CreateTypePlan {
4426        name,
4427        typ: Type { create_sql, inner },
4428    }))
4429}
4430
4431generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4432
4433generate_extracted_config!(
4434    CreateTypeMapOption,
4435    (KeyType, ResolvedDataType),
4436    (ValueType, ResolvedDataType)
4437);
4438
4439#[derive(Debug)]
4440pub enum PlannedAlterRoleOption {
4441    Attributes(PlannedRoleAttributes),
4442    Variable(PlannedRoleVariable),
4443}
4444
4445#[derive(Debug, Clone)]
4446pub struct PlannedRoleAttributes {
4447    pub inherit: Option<bool>,
4448    pub password: Option<Password>,
4449    pub scram_iterations: Option<NonZeroU32>,
4450    /// `nopassword` is set to true if the password is from the parser is None.
4451    /// This is semantically different than not supplying a password at all,
4452    /// to allow for unsetting a password.
4453    pub nopassword: Option<bool>,
4454    pub superuser: Option<bool>,
4455    pub login: Option<bool>,
4456}
4457
4458fn plan_role_attributes(
4459    options: Vec<RoleAttribute>,
4460    scx: &StatementContext,
4461) -> Result<PlannedRoleAttributes, PlanError> {
4462    let mut planned_attributes = PlannedRoleAttributes {
4463        inherit: None,
4464        password: None,
4465        scram_iterations: None,
4466        superuser: None,
4467        login: None,
4468        nopassword: None,
4469    };
4470
4471    for option in options {
4472        match option {
4473            RoleAttribute::Inherit | RoleAttribute::NoInherit
4474                if planned_attributes.inherit.is_some() =>
4475            {
4476                sql_bail!("conflicting or redundant options");
4477            }
4478            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4479                bail_never_supported!(
4480                    "CREATECLUSTER attribute",
4481                    "sql/create-role/#details",
4482                    "Use system privileges instead."
4483                );
4484            }
4485            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4486                bail_never_supported!(
4487                    "CREATEDB attribute",
4488                    "sql/create-role/#details",
4489                    "Use system privileges instead."
4490                );
4491            }
4492            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4493                bail_never_supported!(
4494                    "CREATEROLE attribute",
4495                    "sql/create-role/#details",
4496                    "Use system privileges instead."
4497                );
4498            }
4499            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4500                sql_bail!("conflicting or redundant options");
4501            }
4502
4503            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4504            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4505            RoleAttribute::Password(password) => {
4506                if let Some(password) = password {
4507                    planned_attributes.password = Some(password.into());
4508                    planned_attributes.scram_iterations =
4509                        Some(scx.catalog.system_vars().scram_iterations())
4510                } else {
4511                    planned_attributes.nopassword = Some(true);
4512                }
4513            }
4514            RoleAttribute::SuperUser => {
4515                if planned_attributes.superuser == Some(false) {
4516                    sql_bail!("conflicting or redundant options");
4517                }
4518                planned_attributes.superuser = Some(true);
4519            }
4520            RoleAttribute::NoSuperUser => {
4521                if planned_attributes.superuser == Some(true) {
4522                    sql_bail!("conflicting or redundant options");
4523                }
4524                planned_attributes.superuser = Some(false);
4525            }
4526            RoleAttribute::Login => {
4527                if planned_attributes.login == Some(false) {
4528                    sql_bail!("conflicting or redundant options");
4529                }
4530                planned_attributes.login = Some(true);
4531            }
4532            RoleAttribute::NoLogin => {
4533                if planned_attributes.login == Some(true) {
4534                    sql_bail!("conflicting or redundant options");
4535                }
4536                planned_attributes.login = Some(false);
4537            }
4538        }
4539    }
4540    if planned_attributes.inherit == Some(false) {
4541        bail_unsupported!("non inherit roles");
4542    }
4543
4544    Ok(planned_attributes)
4545}
4546
4547#[derive(Debug)]
4548pub enum PlannedRoleVariable {
4549    Set { name: String, value: VariableValue },
4550    Reset { name: String },
4551}
4552
4553impl PlannedRoleVariable {
4554    pub fn name(&self) -> &str {
4555        match self {
4556            PlannedRoleVariable::Set { name, .. } => name,
4557            PlannedRoleVariable::Reset { name } => name,
4558        }
4559    }
4560}
4561
4562fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4563    let plan = match variable {
4564        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4565            name: name.to_string(),
4566            value: scl::plan_set_variable_to(value)?,
4567        },
4568        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4569            name: name.to_string(),
4570        },
4571    };
4572    Ok(plan)
4573}
4574
4575pub fn describe_create_role(
4576    _: &StatementContext,
4577    _: CreateRoleStatement,
4578) -> Result<StatementDesc, PlanError> {
4579    Ok(StatementDesc::new(None))
4580}
4581
4582pub fn plan_create_role(
4583    scx: &StatementContext,
4584    CreateRoleStatement { name, options }: CreateRoleStatement,
4585) -> Result<Plan, PlanError> {
4586    let attributes = plan_role_attributes(options, scx)?;
4587    Ok(Plan::CreateRole(CreateRolePlan {
4588        name: normalize::ident(name),
4589        attributes: attributes.into(),
4590    }))
4591}
4592
4593pub fn plan_create_network_policy(
4594    ctx: &StatementContext,
4595    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4596) -> Result<Plan, PlanError> {
4597    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4598    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4599
4600    let Some(rule_defs) = policy_options.rules else {
4601        sql_bail!("RULES must be specified when creating network policies.");
4602    };
4603
4604    let mut rules = vec![];
4605    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4606        let NetworkPolicyRuleOptionExtracted {
4607            seen: _,
4608            direction,
4609            action,
4610            address,
4611        } = options.try_into()?;
4612        let (direction, action, address) = match (direction, action, address) {
4613            (Some(direction), Some(action), Some(address)) => (
4614                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4615                NetworkPolicyRuleAction::try_from(action.as_str())?,
4616                PolicyAddress::try_from(address.as_str())?,
4617            ),
4618            (_, _, _) => {
4619                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4620            }
4621        };
4622        rules.push(NetworkPolicyRule {
4623            name: normalize::ident(name),
4624            direction,
4625            action,
4626            address,
4627        });
4628    }
4629
4630    if rules.len()
4631        > ctx
4632            .catalog
4633            .system_vars()
4634            .max_rules_per_network_policy()
4635            .try_into()?
4636    {
4637        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4638    }
4639
4640    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4641        name: normalize::ident(name),
4642        rules,
4643    }))
4644}
4645
4646pub fn plan_alter_network_policy(
4647    ctx: &StatementContext,
4648    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4649) -> Result<Plan, PlanError> {
4650    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4651
4652    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4653    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4654
4655    let Some(rule_defs) = policy_options.rules else {
4656        sql_bail!("RULES must be specified when creating network policies.");
4657    };
4658
4659    let mut rules = vec![];
4660    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4661        let NetworkPolicyRuleOptionExtracted {
4662            seen: _,
4663            direction,
4664            action,
4665            address,
4666        } = options.try_into()?;
4667
4668        let (direction, action, address) = match (direction, action, address) {
4669            (Some(direction), Some(action), Some(address)) => (
4670                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4671                NetworkPolicyRuleAction::try_from(action.as_str())?,
4672                PolicyAddress::try_from(address.as_str())?,
4673            ),
4674            (_, _, _) => {
4675                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4676            }
4677        };
4678        rules.push(NetworkPolicyRule {
4679            name: normalize::ident(name),
4680            direction,
4681            action,
4682            address,
4683        });
4684    }
4685    if rules.len()
4686        > ctx
4687            .catalog
4688            .system_vars()
4689            .max_rules_per_network_policy()
4690            .try_into()?
4691    {
4692        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4693    }
4694
4695    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4696        id: policy.id(),
4697        name: normalize::ident(name),
4698        rules,
4699    }))
4700}
4701
4702pub fn describe_create_cluster(
4703    _: &StatementContext,
4704    _: CreateClusterStatement<Aug>,
4705) -> Result<StatementDesc, PlanError> {
4706    Ok(StatementDesc::new(None))
4707}
4708
4709// WARNING:
4710// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4711// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4712// that option stays the same. If you were to give a default value here, then not giving that option
4713// to ALTER CLUSTER would always reset the value of that option to the default.
4714generate_extracted_config!(
4715    ClusterOption,
4716    (AvailabilityZones, Vec<String>),
4717    (Disk, bool),
4718    (IntrospectionDebugging, bool),
4719    (IntrospectionInterval, OptionalDuration),
4720    (Managed, bool),
4721    (Replicas, Vec<ReplicaDefinition<Aug>>),
4722    (ReplicationFactor, u32),
4723    (Size, String),
4724    (Schedule, ClusterScheduleOptionValue),
4725    (WorkloadClass, OptionalString)
4726);
4727
4728generate_extracted_config!(
4729    NetworkPolicyOption,
4730    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4731);
4732
4733generate_extracted_config!(
4734    NetworkPolicyRuleOption,
4735    (Direction, String),
4736    (Action, String),
4737    (Address, String)
4738);
4739
4740generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4741
4742generate_extracted_config!(
4743    ClusterAlterUntilReadyOption,
4744    (Timeout, Duration),
4745    (OnTimeout, String)
4746);
4747
4748generate_extracted_config!(
4749    ClusterFeature,
4750    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4751    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4752    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4753    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4754    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4755    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4756    (
4757        EnableProjectionPushdownAfterRelationCse,
4758        Option<bool>,
4759        Default(None)
4760    )
4761);
4762
4763/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4764///
4765/// The reverse of [`unplan_create_cluster`].
4766pub fn plan_create_cluster(
4767    scx: &StatementContext,
4768    stmt: CreateClusterStatement<Aug>,
4769) -> Result<Plan, PlanError> {
4770    let plan = plan_create_cluster_inner(scx, stmt)?;
4771
4772    // Roundtrip through unplan and make sure that we end up with the same plan.
4773    if let CreateClusterVariant::Managed(_) = &plan.variant {
4774        let stmt = unplan_create_cluster(scx, plan.clone())
4775            .map_err(|e| PlanError::Replan(e.to_string()))?;
4776        let create_sql = stmt.to_ast_string_stable();
4777        let stmt = parse::parse(&create_sql)
4778            .map_err(|e| PlanError::Replan(e.to_string()))?
4779            .into_element()
4780            .ast;
4781        let (stmt, _resolved_ids) =
4782            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4783        let stmt = match stmt {
4784            Statement::CreateCluster(stmt) => stmt,
4785            stmt => {
4786                return Err(PlanError::Replan(format!(
4787                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4788                )));
4789            }
4790        };
4791        let replan =
4792            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4793        if plan != replan {
4794            return Err(PlanError::Replan(format!(
4795                "replan does not match: plan={plan:?}, replan={replan:?}"
4796            )));
4797        }
4798    }
4799
4800    Ok(Plan::CreateCluster(plan))
4801}
4802
4803pub fn plan_create_cluster_inner(
4804    scx: &StatementContext,
4805    CreateClusterStatement {
4806        name,
4807        options,
4808        features,
4809    }: CreateClusterStatement<Aug>,
4810) -> Result<CreateClusterPlan, PlanError> {
4811    let ClusterOptionExtracted {
4812        availability_zones,
4813        introspection_debugging,
4814        introspection_interval,
4815        managed,
4816        replicas,
4817        replication_factor,
4818        seen: _,
4819        size,
4820        disk,
4821        schedule,
4822        workload_class,
4823    }: ClusterOptionExtracted = options.try_into()?;
4824
4825    let managed = managed.unwrap_or_else(|| replicas.is_none());
4826
4827    if !scx.catalog.active_role_id().is_system() {
4828        if !features.is_empty() {
4829            sql_bail!("FEATURES not supported for non-system users");
4830        }
4831        if workload_class.is_some() {
4832            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4833        }
4834    }
4835
4836    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4837    let workload_class = workload_class.and_then(|v| v.0);
4838
4839    if managed {
4840        if replicas.is_some() {
4841            sql_bail!("REPLICAS not supported for managed clusters");
4842        }
4843        let Some(size) = size else {
4844            sql_bail!("SIZE must be specified for managed clusters");
4845        };
4846
4847        if disk.is_some() {
4848            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4849            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4850            // we'll be able to remove the `DISK` option entirely.
4851            if scx.catalog.is_cluster_size_cc(&size) {
4852                sql_bail!(
4853                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4854                );
4855            }
4856
4857            scx.catalog
4858                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4859        }
4860
4861        let compute = plan_compute_replica_config(
4862            introspection_interval,
4863            introspection_debugging.unwrap_or(false),
4864        )?;
4865
4866        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4867            replication_factor.unwrap_or_else(|| {
4868                scx.catalog
4869                    .system_vars()
4870                    .default_cluster_replication_factor()
4871            })
4872        } else {
4873            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4874            if replication_factor.is_some() {
4875                sql_bail!(
4876                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4877                );
4878            }
4879            // If we have a non-trivial schedule, then let's not have any replicas initially,
4880            // to avoid quickly going back and forth if the schedule doesn't want a replica
4881            // initially.
4882            0
4883        };
4884        let availability_zones = availability_zones.unwrap_or_default();
4885
4886        if !availability_zones.is_empty() {
4887            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4888        }
4889
4890        // Plan OptimizerFeatureOverrides.
4891        let ClusterFeatureExtracted {
4892            reoptimize_imported_views,
4893            enable_eager_delta_joins,
4894            enable_new_outer_join_lowering,
4895            enable_variadic_left_join_lowering,
4896            enable_letrec_fixpoint_analysis,
4897            enable_join_prioritize_arranged,
4898            enable_projection_pushdown_after_relation_cse,
4899            seen: _,
4900        } = ClusterFeatureExtracted::try_from(features)?;
4901        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4902            reoptimize_imported_views,
4903            enable_eager_delta_joins,
4904            enable_new_outer_join_lowering,
4905            enable_variadic_left_join_lowering,
4906            enable_letrec_fixpoint_analysis,
4907            enable_join_prioritize_arranged,
4908            enable_projection_pushdown_after_relation_cse,
4909            ..Default::default()
4910        };
4911
4912        let schedule = plan_cluster_schedule(schedule)?;
4913
4914        Ok(CreateClusterPlan {
4915            name: normalize::ident(name),
4916            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4917                replication_factor,
4918                size,
4919                availability_zones,
4920                compute,
4921                optimizer_feature_overrides,
4922                schedule,
4923            }),
4924            workload_class,
4925        })
4926    } else {
4927        let Some(replica_defs) = replicas else {
4928            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4929        };
4930        if availability_zones.is_some() {
4931            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4932        }
4933        if replication_factor.is_some() {
4934            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4935        }
4936        if introspection_debugging.is_some() {
4937            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4938        }
4939        if introspection_interval.is_some() {
4940            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4941        }
4942        if size.is_some() {
4943            sql_bail!("SIZE not supported for unmanaged clusters");
4944        }
4945        if disk.is_some() {
4946            sql_bail!("DISK not supported for unmanaged clusters");
4947        }
4948        if !features.is_empty() {
4949            sql_bail!("FEATURES not supported for unmanaged clusters");
4950        }
4951        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4952            sql_bail!(
4953                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4954            );
4955        }
4956
4957        let mut replicas = vec![];
4958        for ReplicaDefinition { name, options } in replica_defs {
4959            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4960        }
4961
4962        Ok(CreateClusterPlan {
4963            name: normalize::ident(name),
4964            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4965            workload_class,
4966        })
4967    }
4968}
4969
4970/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4971///
4972/// The reverse of [`plan_create_cluster`].
4973pub fn unplan_create_cluster(
4974    scx: &StatementContext,
4975    CreateClusterPlan {
4976        name,
4977        variant,
4978        workload_class,
4979    }: CreateClusterPlan,
4980) -> Result<CreateClusterStatement<Aug>, PlanError> {
4981    match variant {
4982        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4983            replication_factor,
4984            size,
4985            availability_zones,
4986            compute,
4987            optimizer_feature_overrides,
4988            schedule,
4989        }) => {
4990            let schedule = unplan_cluster_schedule(schedule);
4991            let OptimizerFeatureOverrides {
4992                enable_guard_subquery_tablefunc: _,
4993                enable_consolidate_after_union_negate: _,
4994                enable_reduce_mfp_fusion: _,
4995                enable_cardinality_estimates: _,
4996                persist_fast_path_limit: _,
4997                reoptimize_imported_views,
4998                enable_eager_delta_joins,
4999                enable_new_outer_join_lowering,
5000                enable_variadic_left_join_lowering,
5001                enable_letrec_fixpoint_analysis,
5002                enable_reduce_reduction: _,
5003                enable_join_prioritize_arranged,
5004                enable_projection_pushdown_after_relation_cse,
5005                enable_less_reduce_in_eqprop: _,
5006                enable_dequadratic_eqprop_map: _,
5007                enable_eq_classes_withholding_errors: _,
5008                enable_fast_path_plan_insights: _,
5009                enable_cast_elimination: _,
5010            } = optimizer_feature_overrides;
5011            // The ones from above that don't occur below are not wired up to cluster features.
5012            let features_extracted = ClusterFeatureExtracted {
5013                // Seen is ignored when unplanning.
5014                seen: Default::default(),
5015                reoptimize_imported_views,
5016                enable_eager_delta_joins,
5017                enable_new_outer_join_lowering,
5018                enable_variadic_left_join_lowering,
5019                enable_letrec_fixpoint_analysis,
5020                enable_join_prioritize_arranged,
5021                enable_projection_pushdown_after_relation_cse,
5022            };
5023            let features = features_extracted.into_values(scx.catalog);
5024            let availability_zones = if availability_zones.is_empty() {
5025                None
5026            } else {
5027                Some(availability_zones)
5028            };
5029            let (introspection_interval, introspection_debugging) =
5030                unplan_compute_replica_config(compute);
5031            // Replication factor cannot be explicitly specified with a refresh schedule, it's
5032            // always 1 or less.
5033            let replication_factor = match &schedule {
5034                ClusterScheduleOptionValue::Manual => Some(replication_factor),
5035                ClusterScheduleOptionValue::Refresh { .. } => {
5036                    assert!(
5037                        replication_factor <= 1,
5038                        "replication factor, {replication_factor:?}, must be <= 1"
5039                    );
5040                    None
5041                }
5042            };
5043            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5044            let options_extracted = ClusterOptionExtracted {
5045                // Seen is ignored when unplanning.
5046                seen: Default::default(),
5047                availability_zones,
5048                disk: None,
5049                introspection_debugging: Some(introspection_debugging),
5050                introspection_interval,
5051                managed: Some(true),
5052                replicas: None,
5053                replication_factor,
5054                size: Some(size),
5055                schedule: Some(schedule),
5056                workload_class,
5057            };
5058            let options = options_extracted.into_values(scx.catalog);
5059            let name = Ident::new_unchecked(name);
5060            Ok(CreateClusterStatement {
5061                name,
5062                options,
5063                features,
5064            })
5065        }
5066        CreateClusterVariant::Unmanaged(_) => {
5067            bail_unsupported!("SHOW CREATE for unmanaged clusters")
5068        }
5069    }
5070}
5071
5072generate_extracted_config!(
5073    ReplicaOption,
5074    (AvailabilityZone, String),
5075    (BilledAs, String),
5076    (ComputeAddresses, Vec<String>),
5077    (ComputectlAddresses, Vec<String>),
5078    (Disk, bool),
5079    (Internal, bool, Default(false)),
5080    (IntrospectionDebugging, bool, Default(false)),
5081    (IntrospectionInterval, OptionalDuration),
5082    (Size, String),
5083    (StorageAddresses, Vec<String>),
5084    (StoragectlAddresses, Vec<String>),
5085    (Workers, u16)
5086);
5087
5088fn plan_replica_config(
5089    scx: &StatementContext,
5090    options: Vec<ReplicaOption<Aug>>,
5091) -> Result<ReplicaConfig, PlanError> {
5092    let ReplicaOptionExtracted {
5093        availability_zone,
5094        billed_as,
5095        computectl_addresses,
5096        disk,
5097        internal,
5098        introspection_debugging,
5099        introspection_interval,
5100        size,
5101        storagectl_addresses,
5102        ..
5103    }: ReplicaOptionExtracted = options.try_into()?;
5104
5105    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5106
5107    match (
5108        size,
5109        availability_zone,
5110        billed_as,
5111        storagectl_addresses,
5112        computectl_addresses,
5113    ) {
5114        // Common cases we expect end users to hit.
5115        (None, _, None, None, None) => {
5116            // We don't mention the unmanaged options in the error message
5117            // because they are only available in unsafe mode.
5118            sql_bail!("SIZE option must be specified");
5119        }
5120        (Some(size), availability_zone, billed_as, None, None) => {
5121            if disk.is_some() {
5122                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5123                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5124                // we'll be able to remove the `DISK` option entirely.
5125                if scx.catalog.is_cluster_size_cc(&size) {
5126                    sql_bail!(
5127                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5128                    );
5129                }
5130
5131                scx.catalog
5132                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5133            }
5134
5135            Ok(ReplicaConfig::Orchestrated {
5136                size,
5137                availability_zone,
5138                compute,
5139                billed_as,
5140                internal,
5141            })
5142        }
5143
5144        (None, None, None, storagectl_addresses, computectl_addresses) => {
5145            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5146
5147            // When manually testing Materialize in unsafe mode, it's easy to
5148            // accidentally omit one of these options, so we try to produce
5149            // helpful error messages.
5150            let Some(storagectl_addrs) = storagectl_addresses else {
5151                sql_bail!("missing STORAGECTL ADDRESSES option");
5152            };
5153            let Some(computectl_addrs) = computectl_addresses else {
5154                sql_bail!("missing COMPUTECTL ADDRESSES option");
5155            };
5156
5157            if storagectl_addrs.len() != computectl_addrs.len() {
5158                sql_bail!(
5159                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5160                );
5161            }
5162
5163            if disk.is_some() {
5164                sql_bail!("DISK can't be specified for unorchestrated clusters");
5165            }
5166
5167            Ok(ReplicaConfig::Unorchestrated {
5168                storagectl_addrs,
5169                computectl_addrs,
5170                compute,
5171            })
5172        }
5173        _ => {
5174            // We don't bother trying to produce a more helpful error message
5175            // here because no user is likely to hit this path.
5176            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5177        }
5178    }
5179}
5180
5181/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5182///
5183/// The reverse of [`unplan_compute_replica_config`].
5184fn plan_compute_replica_config(
5185    introspection_interval: Option<OptionalDuration>,
5186    introspection_debugging: bool,
5187) -> Result<ComputeReplicaConfig, PlanError> {
5188    let introspection_interval = introspection_interval
5189        .map(|OptionalDuration(i)| i)
5190        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5191    let introspection = match introspection_interval {
5192        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5193            interval,
5194            debugging: introspection_debugging,
5195        }),
5196        None if introspection_debugging => {
5197            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5198        }
5199        None => None,
5200    };
5201    let compute = ComputeReplicaConfig { introspection };
5202    Ok(compute)
5203}
5204
5205/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5206///
5207/// The reverse of [`plan_compute_replica_config`].
5208fn unplan_compute_replica_config(
5209    compute_replica_config: ComputeReplicaConfig,
5210) -> (Option<OptionalDuration>, bool) {
5211    match compute_replica_config.introspection {
5212        Some(ComputeReplicaIntrospectionConfig {
5213            debugging,
5214            interval,
5215        }) => (Some(OptionalDuration(Some(interval))), debugging),
5216        None => (Some(OptionalDuration(None)), false),
5217    }
5218}
5219
5220/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5221///
5222/// The reverse of [`unplan_cluster_schedule`].
5223fn plan_cluster_schedule(
5224    schedule: ClusterScheduleOptionValue,
5225) -> Result<ClusterSchedule, PlanError> {
5226    Ok(match schedule {
5227        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5228        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5229        ClusterScheduleOptionValue::Refresh {
5230            hydration_time_estimate: None,
5231        } => ClusterSchedule::Refresh {
5232            hydration_time_estimate: Duration::from_millis(0),
5233        },
5234        // Otherwise we convert the `IntervalValue` to a `Duration`.
5235        ClusterScheduleOptionValue::Refresh {
5236            hydration_time_estimate: Some(interval_value),
5237        } => {
5238            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5239            if interval.as_microseconds() < 0 {
5240                sql_bail!(
5241                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5242                    interval
5243                );
5244            }
5245            if interval.months != 0 {
5246                // This limitation is because we want this interval to be cleanly convertable
5247                // to a unix epoch timestamp difference. When the interval involves months, then
5248                // this is not true anymore, because months have variable lengths.
5249                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5250            }
5251            let duration = interval.duration()?;
5252            if u64::try_from(duration.as_millis()).is_err()
5253                || Interval::from_duration(&duration).is_err()
5254            {
5255                sql_bail!("HYDRATION TIME ESTIMATE too large");
5256            }
5257            ClusterSchedule::Refresh {
5258                hydration_time_estimate: duration,
5259            }
5260        }
5261    })
5262}
5263
5264/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5265///
5266/// The reverse of [`plan_cluster_schedule`].
5267fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5268    match schedule {
5269        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5270        ClusterSchedule::Refresh {
5271            hydration_time_estimate,
5272        } => {
5273            let interval = Interval::from_duration(&hydration_time_estimate)
5274                .expect("planning ensured that this is convertible back to Interval");
5275            let interval_value = literal::unplan_interval(&interval);
5276            ClusterScheduleOptionValue::Refresh {
5277                hydration_time_estimate: Some(interval_value),
5278            }
5279        }
5280    }
5281}
5282
5283pub fn describe_create_cluster_replica(
5284    _: &StatementContext,
5285    _: CreateClusterReplicaStatement<Aug>,
5286) -> Result<StatementDesc, PlanError> {
5287    Ok(StatementDesc::new(None))
5288}
5289
5290pub fn plan_create_cluster_replica(
5291    scx: &StatementContext,
5292    CreateClusterReplicaStatement {
5293        definition: ReplicaDefinition { name, options },
5294        of_cluster,
5295    }: CreateClusterReplicaStatement<Aug>,
5296) -> Result<Plan, PlanError> {
5297    let cluster = scx
5298        .catalog
5299        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5300    let current_replica_count = cluster.replica_ids().iter().count();
5301    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5302        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5303        return Err(PlanError::CreateReplicaFailStorageObjects {
5304            current_replica_count,
5305            internal_replica_count,
5306            hypothetical_replica_count: current_replica_count + 1,
5307        });
5308    }
5309
5310    let config = plan_replica_config(scx, options)?;
5311
5312    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5313        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5314            return Err(PlanError::MangedReplicaName(name.into_string()));
5315        }
5316    } else {
5317        ensure_cluster_is_not_managed(scx, cluster.id())?;
5318    }
5319
5320    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5321        name: normalize::ident(name),
5322        cluster_id: cluster.id(),
5323        config,
5324    }))
5325}
5326
5327pub fn describe_create_secret(
5328    _: &StatementContext,
5329    _: CreateSecretStatement<Aug>,
5330) -> Result<StatementDesc, PlanError> {
5331    Ok(StatementDesc::new(None))
5332}
5333
5334pub fn plan_create_secret(
5335    scx: &StatementContext,
5336    stmt: CreateSecretStatement<Aug>,
5337) -> Result<Plan, PlanError> {
5338    let CreateSecretStatement {
5339        name,
5340        if_not_exists,
5341        value,
5342    } = &stmt;
5343
5344    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5345    let mut create_sql_statement = stmt.clone();
5346    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5347    let create_sql =
5348        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5349    let secret_as = query::plan_secret_as(scx, value.clone())?;
5350
5351    let secret = Secret {
5352        create_sql,
5353        secret_as,
5354    };
5355
5356    Ok(Plan::CreateSecret(CreateSecretPlan {
5357        name,
5358        secret,
5359        if_not_exists: *if_not_exists,
5360    }))
5361}
5362
5363pub fn describe_create_connection(
5364    _: &StatementContext,
5365    _: CreateConnectionStatement<Aug>,
5366) -> Result<StatementDesc, PlanError> {
5367    Ok(StatementDesc::new(None))
5368}
5369
5370generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5371
5372pub fn plan_create_connection(
5373    scx: &StatementContext,
5374    mut stmt: CreateConnectionStatement<Aug>,
5375) -> Result<Plan, PlanError> {
5376    let CreateConnectionStatement {
5377        name,
5378        connection_type,
5379        values,
5380        if_not_exists,
5381        with_options,
5382    } = stmt.clone();
5383    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5384    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5385    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5386
5387    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5388    if options.validate.is_some() {
5389        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5390    }
5391    let validate = match options.validate {
5392        Some(val) => val,
5393        None => {
5394            scx.catalog
5395                .system_vars()
5396                .enable_default_connection_validation()
5397                && details.to_connection().validate_by_default()
5398        }
5399    };
5400
5401    // Check for an object in the catalog with this same name
5402    let full_name = scx.catalog.resolve_full_name(&name);
5403    let partial_name = PartialItemName::from(full_name.clone());
5404    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5405        return Err(PlanError::ItemAlreadyExists {
5406            name: full_name.to_string(),
5407            item_type: item.item_type(),
5408        });
5409    }
5410
5411    // For SSH connections, overwrite the public key options based on the
5412    // connection details, in case we generated new keys during planning.
5413    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5414        stmt.values.retain(|v| {
5415            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5416        });
5417        stmt.values.push(ConnectionOption {
5418            name: ConnectionOptionName::PublicKey1,
5419            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5420        });
5421        stmt.values.push(ConnectionOption {
5422            name: ConnectionOptionName::PublicKey2,
5423            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5424        });
5425    }
5426    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5427
5428    let plan = CreateConnectionPlan {
5429        name,
5430        if_not_exists,
5431        connection: crate::plan::Connection {
5432            create_sql,
5433            details,
5434        },
5435        validate,
5436    };
5437    Ok(Plan::CreateConnection(plan))
5438}
5439
5440fn plan_drop_database(
5441    scx: &StatementContext,
5442    if_exists: bool,
5443    name: &UnresolvedDatabaseName,
5444    cascade: bool,
5445) -> Result<Option<DatabaseId>, PlanError> {
5446    Ok(match resolve_database(scx, name, if_exists)? {
5447        Some(database) => {
5448            if !cascade && database.has_schemas() {
5449                sql_bail!(
5450                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5451                    name,
5452                );
5453            }
5454            Some(database.id())
5455        }
5456        None => None,
5457    })
5458}
5459
5460pub fn describe_drop_objects(
5461    _: &StatementContext,
5462    _: DropObjectsStatement,
5463) -> Result<StatementDesc, PlanError> {
5464    Ok(StatementDesc::new(None))
5465}
5466
5467pub fn plan_drop_objects(
5468    scx: &mut StatementContext,
5469    DropObjectsStatement {
5470        object_type,
5471        if_exists,
5472        names,
5473        cascade,
5474    }: DropObjectsStatement,
5475) -> Result<Plan, PlanError> {
5476    assert_ne!(
5477        object_type,
5478        mz_sql_parser::ast::ObjectType::Func,
5479        "rejected in parser"
5480    );
5481    let object_type = object_type.into();
5482
5483    let mut referenced_ids = Vec::new();
5484    for name in names {
5485        let id = match &name {
5486            UnresolvedObjectName::Cluster(name) => {
5487                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5488            }
5489            UnresolvedObjectName::ClusterReplica(name) => {
5490                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5491            }
5492            UnresolvedObjectName::Database(name) => {
5493                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5494            }
5495            UnresolvedObjectName::Schema(name) => {
5496                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5497            }
5498            UnresolvedObjectName::Role(name) => {
5499                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5500            }
5501            UnresolvedObjectName::Item(name) => {
5502                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5503                    .map(ObjectId::Item)
5504            }
5505            UnresolvedObjectName::NetworkPolicy(name) => {
5506                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5507            }
5508        };
5509        match id {
5510            Some(id) => referenced_ids.push(id),
5511            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5512                name: name.to_ast_string_simple(),
5513                object_type,
5514            }),
5515        }
5516    }
5517    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5518
5519    Ok(Plan::DropObjects(DropObjectsPlan {
5520        referenced_ids,
5521        drop_ids,
5522        object_type,
5523    }))
5524}
5525
5526fn plan_drop_schema(
5527    scx: &StatementContext,
5528    if_exists: bool,
5529    name: &UnresolvedSchemaName,
5530    cascade: bool,
5531) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5532    // Special case for mz_temp: with lazy temporary schema creation, the temp
5533    // schema may not exist yet, but we still need to return the correct error.
5534    // Check the schema name directly against MZ_TEMP_SCHEMA.
5535    let normalized = normalize::unresolved_schema_name(name.clone())?;
5536    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5537        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5538    }
5539
5540    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5541        Some((database_spec, schema_spec)) => {
5542            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5543                sql_bail!(
5544                    "cannot drop schema {name} because it is required by the database system",
5545                );
5546            }
5547            if let SchemaSpecifier::Temporary = schema_spec {
5548                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5549            }
5550            let schema = scx.get_schema(&database_spec, &schema_spec);
5551            if !cascade && schema.has_items() {
5552                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5553                sql_bail!(
5554                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5555                    full_schema_name
5556                );
5557            }
5558            Some((database_spec, schema_spec))
5559        }
5560        None => None,
5561    })
5562}
5563
5564fn plan_drop_role(
5565    scx: &StatementContext,
5566    if_exists: bool,
5567    name: &Ident,
5568) -> Result<Option<RoleId>, PlanError> {
5569    match scx.catalog.resolve_role(name.as_str()) {
5570        Ok(role) => {
5571            let id = role.id();
5572            if &id == scx.catalog.active_role_id() {
5573                sql_bail!("current role cannot be dropped");
5574            }
5575            for role in scx.catalog.get_roles() {
5576                for (member_id, grantor_id) in role.membership() {
5577                    if &id == grantor_id {
5578                        let member_role = scx.catalog.get_role(member_id);
5579                        sql_bail!(
5580                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5581                            name.as_str(),
5582                            role.name(),
5583                            member_role.name()
5584                        );
5585                    }
5586                }
5587            }
5588            Ok(Some(role.id()))
5589        }
5590        Err(_) if if_exists => Ok(None),
5591        Err(e) => Err(e.into()),
5592    }
5593}
5594
5595fn plan_drop_cluster(
5596    scx: &StatementContext,
5597    if_exists: bool,
5598    name: &Ident,
5599    cascade: bool,
5600) -> Result<Option<ClusterId>, PlanError> {
5601    Ok(match resolve_cluster(scx, name, if_exists)? {
5602        Some(cluster) => {
5603            if !cascade && !cluster.bound_objects().is_empty() {
5604                return Err(PlanError::DependentObjectsStillExist {
5605                    object_type: "cluster".to_string(),
5606                    object_name: cluster.name().to_string(),
5607                    dependents: Vec::new(),
5608                });
5609            }
5610            Some(cluster.id())
5611        }
5612        None => None,
5613    })
5614}
5615
5616fn plan_drop_network_policy(
5617    scx: &StatementContext,
5618    if_exists: bool,
5619    name: &Ident,
5620) -> Result<Option<NetworkPolicyId>, PlanError> {
5621    match scx.catalog.resolve_network_policy(name.as_str()) {
5622        Ok(policy) => {
5623            // TODO(network_policy): When we support role based network policies, check if any role
5624            // currently has the specified policy set.
5625            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5626                Err(PlanError::NetworkPolicyInUse)
5627            } else {
5628                Ok(Some(policy.id()))
5629            }
5630        }
5631        Err(_) if if_exists => Ok(None),
5632        Err(e) => Err(e.into()),
5633    }
5634}
5635
5636/// Returns `true` if the cluster has any object that requires a single replica.
5637/// Returns `false` if the cluster has no objects.
5638fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5639    // If this feature is enabled then all objects support multiple-replicas
5640    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5641        false
5642    } else {
5643        // Othewise we check for the existence of sources or sinks
5644        cluster.bound_objects().iter().any(|id| {
5645            let item = scx.catalog.get_item(id);
5646            matches!(
5647                item.item_type(),
5648                CatalogItemType::Sink | CatalogItemType::Source
5649            )
5650        })
5651    }
5652}
5653
5654fn plan_drop_cluster_replica(
5655    scx: &StatementContext,
5656    if_exists: bool,
5657    name: &QualifiedReplica,
5658) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5659    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5660    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5661}
5662
5663/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5664fn plan_drop_item(
5665    scx: &StatementContext,
5666    object_type: ObjectType,
5667    if_exists: bool,
5668    name: UnresolvedItemName,
5669    cascade: bool,
5670) -> Result<Option<CatalogItemId>, PlanError> {
5671    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5672        Ok(r) => r,
5673        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5674        Err(PlanError::MismatchedObjectType {
5675            name,
5676            is_type: ObjectType::MaterializedView,
5677            expected_type: ObjectType::View,
5678        }) => {
5679            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5680        }
5681        e => e?,
5682    };
5683
5684    Ok(match resolved {
5685        Some(catalog_item) => {
5686            if catalog_item.id().is_system() {
5687                sql_bail!(
5688                    "cannot drop {} {} because it is required by the database system",
5689                    catalog_item.item_type(),
5690                    scx.catalog.minimal_qualification(catalog_item.name()),
5691                );
5692            }
5693
5694            if !cascade {
5695                for id in catalog_item.used_by() {
5696                    let dep = scx.catalog.get_item(id);
5697                    if dependency_prevents_drop(object_type, dep) {
5698                        return Err(PlanError::DependentObjectsStillExist {
5699                            object_type: catalog_item.item_type().to_string(),
5700                            object_name: scx
5701                                .catalog
5702                                .minimal_qualification(catalog_item.name())
5703                                .to_string(),
5704                            dependents: vec![(
5705                                dep.item_type().to_string(),
5706                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5707                            )],
5708                        });
5709                    }
5710                }
5711                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5712                //  relies on entry. Unfortunately, we don't have that information readily available.
5713            }
5714            Some(catalog_item.id())
5715        }
5716        None => None,
5717    })
5718}
5719
5720/// Does the dependency `dep` prevent a drop of a non-cascade query?
5721fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5722    match object_type {
5723        ObjectType::Type => true,
5724        _ => match dep.item_type() {
5725            CatalogItemType::Func
5726            | CatalogItemType::Table
5727            | CatalogItemType::Source
5728            | CatalogItemType::View
5729            | CatalogItemType::MaterializedView
5730            | CatalogItemType::Sink
5731            | CatalogItemType::Type
5732            | CatalogItemType::Secret
5733            | CatalogItemType::Connection
5734            | CatalogItemType::ContinualTask => true,
5735            CatalogItemType::Index => false,
5736        },
5737    }
5738}
5739
5740pub fn describe_alter_index_options(
5741    _: &StatementContext,
5742    _: AlterIndexStatement<Aug>,
5743) -> Result<StatementDesc, PlanError> {
5744    Ok(StatementDesc::new(None))
5745}
5746
5747pub fn describe_drop_owned(
5748    _: &StatementContext,
5749    _: DropOwnedStatement<Aug>,
5750) -> Result<StatementDesc, PlanError> {
5751    Ok(StatementDesc::new(None))
5752}
5753
5754pub fn plan_drop_owned(
5755    scx: &StatementContext,
5756    drop: DropOwnedStatement<Aug>,
5757) -> Result<Plan, PlanError> {
5758    let cascade = drop.cascade();
5759    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5760    let mut drop_ids = Vec::new();
5761    let mut privilege_revokes = Vec::new();
5762    let mut default_privilege_revokes = Vec::new();
5763
5764    fn update_privilege_revokes(
5765        object_id: SystemObjectId,
5766        privileges: &PrivilegeMap,
5767        role_ids: &BTreeSet<RoleId>,
5768        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5769    ) {
5770        privilege_revokes.extend(iter::zip(
5771            iter::repeat(object_id),
5772            privileges
5773                .all_values()
5774                .filter(|privilege| role_ids.contains(&privilege.grantee))
5775                .cloned(),
5776        ));
5777    }
5778
5779    // Replicas
5780    for replica in scx.catalog.get_cluster_replicas() {
5781        if role_ids.contains(&replica.owner_id()) {
5782            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5783        }
5784    }
5785
5786    // Clusters
5787    for cluster in scx.catalog.get_clusters() {
5788        if role_ids.contains(&cluster.owner_id()) {
5789            // Note: CASCADE is not required for replicas.
5790            if !cascade {
5791                let non_owned_bound_objects: Vec<_> = cluster
5792                    .bound_objects()
5793                    .into_iter()
5794                    .map(|item_id| scx.catalog.get_item(item_id))
5795                    .filter(|item| !role_ids.contains(&item.owner_id()))
5796                    .collect();
5797                if !non_owned_bound_objects.is_empty() {
5798                    let names: Vec<_> = non_owned_bound_objects
5799                        .into_iter()
5800                        .map(|item| {
5801                            (
5802                                item.item_type().to_string(),
5803                                scx.catalog.resolve_full_name(item.name()).to_string(),
5804                            )
5805                        })
5806                        .collect();
5807                    return Err(PlanError::DependentObjectsStillExist {
5808                        object_type: "cluster".to_string(),
5809                        object_name: cluster.name().to_string(),
5810                        dependents: names,
5811                    });
5812                }
5813            }
5814            drop_ids.push(cluster.id().into());
5815        }
5816        update_privilege_revokes(
5817            SystemObjectId::Object(cluster.id().into()),
5818            cluster.privileges(),
5819            &role_ids,
5820            &mut privilege_revokes,
5821        );
5822    }
5823
5824    // Items
5825    for item in scx.catalog.get_items() {
5826        if role_ids.contains(&item.owner_id()) {
5827            if !cascade {
5828                // Checks if any items still depend on this one, returning an error if so.
5829                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5830                    let non_owned_dependencies: Vec<_> = used_by
5831                        .into_iter()
5832                        .map(|item_id| scx.catalog.get_item(item_id))
5833                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5834                        .filter(|item| !role_ids.contains(&item.owner_id()))
5835                        .collect();
5836                    if !non_owned_dependencies.is_empty() {
5837                        let names: Vec<_> = non_owned_dependencies
5838                            .into_iter()
5839                            .map(|item| {
5840                                let item_typ = item.item_type().to_string();
5841                                let item_name =
5842                                    scx.catalog.resolve_full_name(item.name()).to_string();
5843                                (item_typ, item_name)
5844                            })
5845                            .collect();
5846                        Err(PlanError::DependentObjectsStillExist {
5847                            object_type: item.item_type().to_string(),
5848                            object_name: scx
5849                                .catalog
5850                                .resolve_full_name(item.name())
5851                                .to_string()
5852                                .to_string(),
5853                            dependents: names,
5854                        })
5855                    } else {
5856                        Ok(())
5857                    }
5858                };
5859
5860                // When this item gets dropped it will also drop its progress source, so we need to
5861                // check the users of those.
5862                if let Some(id) = item.progress_id() {
5863                    let progress_item = scx.catalog.get_item(&id);
5864                    check_if_dependents_exist(progress_item.used_by())?;
5865                }
5866                check_if_dependents_exist(item.used_by())?;
5867            }
5868            drop_ids.push(item.id().into());
5869        }
5870        update_privilege_revokes(
5871            SystemObjectId::Object(item.id().into()),
5872            item.privileges(),
5873            &role_ids,
5874            &mut privilege_revokes,
5875        );
5876    }
5877
5878    // Schemas
5879    for schema in scx.catalog.get_schemas() {
5880        if !schema.id().is_temporary() {
5881            if role_ids.contains(&schema.owner_id()) {
5882                if !cascade {
5883                    let non_owned_dependencies: Vec<_> = schema
5884                        .item_ids()
5885                        .map(|item_id| scx.catalog.get_item(&item_id))
5886                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5887                        .filter(|item| !role_ids.contains(&item.owner_id()))
5888                        .collect();
5889                    if !non_owned_dependencies.is_empty() {
5890                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5891                        sql_bail!(
5892                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5893                            full_schema_name.to_string().quoted()
5894                        );
5895                    }
5896                }
5897                drop_ids.push((*schema.database(), *schema.id()).into())
5898            }
5899            update_privilege_revokes(
5900                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5901                schema.privileges(),
5902                &role_ids,
5903                &mut privilege_revokes,
5904            );
5905        }
5906    }
5907
5908    // Databases
5909    for database in scx.catalog.get_databases() {
5910        if role_ids.contains(&database.owner_id()) {
5911            if !cascade {
5912                let non_owned_schemas: Vec<_> = database
5913                    .schemas()
5914                    .into_iter()
5915                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5916                    .collect();
5917                if !non_owned_schemas.is_empty() {
5918                    sql_bail!(
5919                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5920                        database.name().quoted(),
5921                    );
5922                }
5923            }
5924            drop_ids.push(database.id().into());
5925        }
5926        update_privilege_revokes(
5927            SystemObjectId::Object(database.id().into()),
5928            database.privileges(),
5929            &role_ids,
5930            &mut privilege_revokes,
5931        );
5932    }
5933
5934    // System
5935    update_privilege_revokes(
5936        SystemObjectId::System,
5937        scx.catalog.get_system_privileges(),
5938        &role_ids,
5939        &mut privilege_revokes,
5940    );
5941
5942    for (default_privilege_object, default_privilege_acl_items) in
5943        scx.catalog.get_default_privileges()
5944    {
5945        for default_privilege_acl_item in default_privilege_acl_items {
5946            if role_ids.contains(&default_privilege_object.role_id)
5947                || role_ids.contains(&default_privilege_acl_item.grantee)
5948            {
5949                default_privilege_revokes.push((
5950                    default_privilege_object.clone(),
5951                    default_privilege_acl_item.clone(),
5952                ));
5953            }
5954        }
5955    }
5956
5957    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5958
5959    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5960    if !system_ids.is_empty() {
5961        let mut owners = system_ids
5962            .into_iter()
5963            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5964            .collect::<BTreeSet<_>>()
5965            .into_iter()
5966            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5967        sql_bail!(
5968            "cannot drop objects owned by role {} because they are required by the database system",
5969            owners.join(", "),
5970        );
5971    }
5972
5973    Ok(Plan::DropOwned(DropOwnedPlan {
5974        role_ids: role_ids.into_iter().collect(),
5975        drop_ids,
5976        privilege_revokes,
5977        default_privilege_revokes,
5978    }))
5979}
5980
5981fn plan_retain_history_option(
5982    scx: &StatementContext,
5983    retain_history: Option<OptionalDuration>,
5984) -> Result<Option<CompactionWindow>, PlanError> {
5985    if let Some(OptionalDuration(lcw)) = retain_history {
5986        Ok(Some(plan_retain_history(scx, lcw)?))
5987    } else {
5988        Ok(None)
5989    }
5990}
5991
5992// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5993// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5994// already converts the zero duration into `None`. This function must not be called in the `RESET
5995// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5996// `None`.
5997fn plan_retain_history(
5998    scx: &StatementContext,
5999    lcw: Option<Duration>,
6000) -> Result<CompactionWindow, PlanError> {
6001    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6002    match lcw {
6003        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
6004        // disable compaction), and should never occur here. Furthermore, some things actually do
6005        // break when this is set to real zero:
6006        // https://github.com/MaterializeInc/database-issues/issues/3798.
6007        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6008            option_name: "RETAIN HISTORY".to_string(),
6009            err: Box::new(PlanError::Unstructured(
6010                "internal error: unexpectedly zero".to_string(),
6011            )),
6012        }),
6013        Some(duration) => {
6014            // Error if the duration is low and enable_unlimited_retain_history is not set (which
6015            // should only be possible during testing).
6016            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6017                && scx
6018                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6019                    .is_err()
6020            {
6021                return Err(PlanError::RetainHistoryLow {
6022                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6023                });
6024            }
6025            Ok(duration.try_into()?)
6026        }
6027        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
6028        // to be a bad choice, so prevent it.
6029        None => {
6030            if scx
6031                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6032                .is_err()
6033            {
6034                Err(PlanError::RetainHistoryRequired)
6035            } else {
6036                Ok(CompactionWindow::DisableCompaction)
6037            }
6038        }
6039    }
6040}
6041
6042generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6043
6044fn plan_index_options(
6045    scx: &StatementContext,
6046    with_opts: Vec<IndexOption<Aug>>,
6047) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6048    if !with_opts.is_empty() {
6049        // Index options are not durable.
6050        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6051    }
6052
6053    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6054
6055    let mut out = Vec::with_capacity(1);
6056    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6057        out.push(crate::plan::IndexOption::RetainHistory(cw));
6058    }
6059    Ok(out)
6060}
6061
6062generate_extracted_config!(
6063    TableOption,
6064    (PartitionBy, Vec<Ident>),
6065    (RetainHistory, OptionalDuration),
6066    (RedactedTest, String)
6067);
6068
6069fn plan_table_options(
6070    scx: &StatementContext,
6071    desc: &RelationDesc,
6072    with_opts: Vec<TableOption<Aug>>,
6073) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6074    let TableOptionExtracted {
6075        partition_by,
6076        retain_history,
6077        redacted_test,
6078        ..
6079    }: TableOptionExtracted = with_opts.try_into()?;
6080
6081    if let Some(partition_by) = partition_by {
6082        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6083        check_partition_by(desc, partition_by)?;
6084    }
6085
6086    if redacted_test.is_some() {
6087        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6088    }
6089
6090    let mut out = Vec::with_capacity(1);
6091    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6092        out.push(crate::plan::TableOption::RetainHistory(cw));
6093    }
6094    Ok(out)
6095}
6096
6097pub fn plan_alter_index_options(
6098    scx: &mut StatementContext,
6099    AlterIndexStatement {
6100        index_name,
6101        if_exists,
6102        action,
6103    }: AlterIndexStatement<Aug>,
6104) -> Result<Plan, PlanError> {
6105    let object_type = ObjectType::Index;
6106    match action {
6107        AlterIndexAction::ResetOptions(options) => {
6108            let mut options = options.into_iter();
6109            if let Some(opt) = options.next() {
6110                match opt {
6111                    IndexOptionName::RetainHistory => {
6112                        if options.next().is_some() {
6113                            sql_bail!("RETAIN HISTORY must be only option");
6114                        }
6115                        return alter_retain_history(
6116                            scx,
6117                            object_type,
6118                            if_exists,
6119                            UnresolvedObjectName::Item(index_name),
6120                            None,
6121                        );
6122                    }
6123                }
6124            }
6125            sql_bail!("expected option");
6126        }
6127        AlterIndexAction::SetOptions(options) => {
6128            let mut options = options.into_iter();
6129            if let Some(opt) = options.next() {
6130                match opt.name {
6131                    IndexOptionName::RetainHistory => {
6132                        if options.next().is_some() {
6133                            sql_bail!("RETAIN HISTORY must be only option");
6134                        }
6135                        return alter_retain_history(
6136                            scx,
6137                            object_type,
6138                            if_exists,
6139                            UnresolvedObjectName::Item(index_name),
6140                            opt.value,
6141                        );
6142                    }
6143                }
6144            }
6145            sql_bail!("expected option");
6146        }
6147    }
6148}
6149
6150pub fn describe_alter_cluster_set_options(
6151    _: &StatementContext,
6152    _: AlterClusterStatement<Aug>,
6153) -> Result<StatementDesc, PlanError> {
6154    Ok(StatementDesc::new(None))
6155}
6156
6157pub fn plan_alter_cluster(
6158    scx: &mut StatementContext,
6159    AlterClusterStatement {
6160        name,
6161        action,
6162        if_exists,
6163    }: AlterClusterStatement<Aug>,
6164) -> Result<Plan, PlanError> {
6165    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6166        Some(entry) => entry,
6167        None => {
6168            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6169                name: name.to_ast_string_simple(),
6170                object_type: ObjectType::Cluster,
6171            });
6172
6173            return Ok(Plan::AlterNoop(AlterNoopPlan {
6174                object_type: ObjectType::Cluster,
6175            }));
6176        }
6177    };
6178
6179    let mut options: PlanClusterOption = Default::default();
6180    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6181
6182    match action {
6183        AlterClusterAction::SetOptions {
6184            options: set_options,
6185            with_options,
6186        } => {
6187            let ClusterOptionExtracted {
6188                availability_zones,
6189                introspection_debugging,
6190                introspection_interval,
6191                managed,
6192                replicas: replica_defs,
6193                replication_factor,
6194                seen: _,
6195                size,
6196                disk,
6197                schedule,
6198                workload_class,
6199            }: ClusterOptionExtracted = set_options.try_into()?;
6200
6201            if !scx.catalog.active_role_id().is_system() {
6202                if workload_class.is_some() {
6203                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6204                }
6205            }
6206
6207            match managed.unwrap_or_else(|| cluster.is_managed()) {
6208                true => {
6209                    let alter_strategy_extracted =
6210                        ClusterAlterOptionExtracted::try_from(with_options)?;
6211                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6212
6213                    match alter_strategy {
6214                        AlterClusterPlanStrategy::None => {}
6215                        _ => {
6216                            scx.require_feature_flag(
6217                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6218                            )?;
6219                        }
6220                    }
6221
6222                    if replica_defs.is_some() {
6223                        sql_bail!("REPLICAS not supported for managed clusters");
6224                    }
6225                    if schedule.is_some()
6226                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6227                    {
6228                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6229                    }
6230
6231                    if let Some(replication_factor) = replication_factor {
6232                        if schedule.is_some()
6233                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6234                        {
6235                            sql_bail!(
6236                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6237                            );
6238                        }
6239                        if let Some(current_schedule) = cluster.schedule() {
6240                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6241                                sql_bail!(
6242                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6243                                );
6244                            }
6245                        }
6246
6247                        let internal_replica_count =
6248                            cluster.replicas().iter().filter(|r| r.internal()).count();
6249                        let hypothetical_replica_count =
6250                            internal_replica_count + usize::cast_from(replication_factor);
6251
6252                        // Total number of replicas running is internal replicas
6253                        // + replication factor.
6254                        if contains_single_replica_objects(scx, cluster)
6255                            && hypothetical_replica_count > 1
6256                        {
6257                            return Err(PlanError::CreateReplicaFailStorageObjects {
6258                                current_replica_count: cluster.replica_ids().iter().count(),
6259                                internal_replica_count,
6260                                hypothetical_replica_count,
6261                            });
6262                        }
6263                    } else if alter_strategy.is_some() {
6264                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6265                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6266                        // just fail.
6267                        let internal_replica_count =
6268                            cluster.replicas().iter().filter(|r| r.internal()).count();
6269                        let hypothetical_replica_count = internal_replica_count * 2;
6270                        if contains_single_replica_objects(scx, cluster) {
6271                            return Err(PlanError::CreateReplicaFailStorageObjects {
6272                                current_replica_count: cluster.replica_ids().iter().count(),
6273                                internal_replica_count,
6274                                hypothetical_replica_count,
6275                            });
6276                        }
6277                    }
6278                }
6279                false => {
6280                    if !alter_strategy.is_none() {
6281                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6282                    }
6283                    if availability_zones.is_some() {
6284                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6285                    }
6286                    if replication_factor.is_some() {
6287                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6288                    }
6289                    if introspection_debugging.is_some() {
6290                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6291                    }
6292                    if introspection_interval.is_some() {
6293                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6294                    }
6295                    if size.is_some() {
6296                        sql_bail!("SIZE not supported for unmanaged clusters");
6297                    }
6298                    if disk.is_some() {
6299                        sql_bail!("DISK not supported for unmanaged clusters");
6300                    }
6301                    if schedule.is_some()
6302                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6303                    {
6304                        sql_bail!(
6305                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6306                        );
6307                    }
6308                    if let Some(current_schedule) = cluster.schedule() {
6309                        if !matches!(current_schedule, ClusterSchedule::Manual)
6310                            && schedule.is_none()
6311                        {
6312                            sql_bail!(
6313                                "when switching a cluster to unmanaged, if the managed \
6314                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6315                                explicitly set the SCHEDULE to MANUAL"
6316                            );
6317                        }
6318                    }
6319                }
6320            }
6321
6322            let mut replicas = vec![];
6323            for ReplicaDefinition { name, options } in
6324                replica_defs.into_iter().flat_map(Vec::into_iter)
6325            {
6326                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6327            }
6328
6329            if let Some(managed) = managed {
6330                options.managed = AlterOptionParameter::Set(managed);
6331            }
6332            if let Some(replication_factor) = replication_factor {
6333                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6334            }
6335            if let Some(size) = &size {
6336                options.size = AlterOptionParameter::Set(size.clone());
6337            }
6338            if let Some(availability_zones) = availability_zones {
6339                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6340            }
6341            if let Some(introspection_debugging) = introspection_debugging {
6342                options.introspection_debugging =
6343                    AlterOptionParameter::Set(introspection_debugging);
6344            }
6345            if let Some(introspection_interval) = introspection_interval {
6346                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6347            }
6348            if disk.is_some() {
6349                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6350                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6351                // we'll be able to remove the `DISK` option entirely.
6352                let size = size.as_deref().unwrap_or_else(|| {
6353                    cluster.managed_size().expect("cluster known to be managed")
6354                });
6355                if scx.catalog.is_cluster_size_cc(size) {
6356                    sql_bail!(
6357                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6358                    );
6359                }
6360
6361                scx.catalog
6362                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6363            }
6364            if !replicas.is_empty() {
6365                options.replicas = AlterOptionParameter::Set(replicas);
6366            }
6367            if let Some(schedule) = schedule {
6368                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6369            }
6370            if let Some(workload_class) = workload_class {
6371                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6372            }
6373        }
6374        AlterClusterAction::ResetOptions(reset_options) => {
6375            use AlterOptionParameter::Reset;
6376            use ClusterOptionName::*;
6377
6378            if !scx.catalog.active_role_id().is_system() {
6379                if reset_options.contains(&WorkloadClass) {
6380                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6381                }
6382            }
6383
6384            for option in reset_options {
6385                match option {
6386                    AvailabilityZones => options.availability_zones = Reset,
6387                    Disk => scx
6388                        .catalog
6389                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6390                    IntrospectionInterval => options.introspection_interval = Reset,
6391                    IntrospectionDebugging => options.introspection_debugging = Reset,
6392                    Managed => options.managed = Reset,
6393                    Replicas => options.replicas = Reset,
6394                    ReplicationFactor => options.replication_factor = Reset,
6395                    Size => options.size = Reset,
6396                    Schedule => options.schedule = Reset,
6397                    WorkloadClass => options.workload_class = Reset,
6398                }
6399            }
6400        }
6401    }
6402    Ok(Plan::AlterCluster(AlterClusterPlan {
6403        id: cluster.id(),
6404        name: cluster.name().to_string(),
6405        options,
6406        strategy: alter_strategy,
6407    }))
6408}
6409
6410pub fn describe_alter_set_cluster(
6411    _: &StatementContext,
6412    _: AlterSetClusterStatement<Aug>,
6413) -> Result<StatementDesc, PlanError> {
6414    Ok(StatementDesc::new(None))
6415}
6416
6417pub fn plan_alter_item_set_cluster(
6418    scx: &StatementContext,
6419    AlterSetClusterStatement {
6420        if_exists,
6421        set_cluster: in_cluster_name,
6422        name,
6423        object_type,
6424    }: AlterSetClusterStatement<Aug>,
6425) -> Result<Plan, PlanError> {
6426    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6427
6428    let object_type = object_type.into();
6429
6430    // Prevent access to `SET CLUSTER` for unsupported objects.
6431    match object_type {
6432        ObjectType::MaterializedView => {}
6433        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6434            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6435        }
6436        _ => {
6437            bail_never_supported!(
6438                format!("ALTER {object_type} SET CLUSTER"),
6439                "sql/alter-set-cluster/",
6440                format!("{object_type} has no associated cluster")
6441            )
6442        }
6443    }
6444
6445    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6446
6447    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6448        Some(entry) => {
6449            let current_cluster = entry.cluster_id();
6450            let Some(current_cluster) = current_cluster else {
6451                sql_bail!("No cluster associated with {name}");
6452            };
6453
6454            if current_cluster == in_cluster.id() {
6455                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6456            } else {
6457                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6458                    id: entry.id(),
6459                    set_cluster: in_cluster.id(),
6460                }))
6461            }
6462        }
6463        None => {
6464            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6465                name: name.to_ast_string_simple(),
6466                object_type,
6467            });
6468
6469            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6470        }
6471    }
6472}
6473
6474pub fn describe_alter_object_rename(
6475    _: &StatementContext,
6476    _: AlterObjectRenameStatement,
6477) -> Result<StatementDesc, PlanError> {
6478    Ok(StatementDesc::new(None))
6479}
6480
6481pub fn plan_alter_object_rename(
6482    scx: &mut StatementContext,
6483    AlterObjectRenameStatement {
6484        name,
6485        object_type,
6486        to_item_name,
6487        if_exists,
6488    }: AlterObjectRenameStatement,
6489) -> Result<Plan, PlanError> {
6490    let object_type = object_type.into();
6491    match (object_type, name) {
6492        (
6493            ObjectType::View
6494            | ObjectType::MaterializedView
6495            | ObjectType::Table
6496            | ObjectType::Source
6497            | ObjectType::Index
6498            | ObjectType::Sink
6499            | ObjectType::Secret
6500            | ObjectType::Connection,
6501            UnresolvedObjectName::Item(name),
6502        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6503        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6504            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6505        }
6506        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6507            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6508        }
6509        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6510            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6511        }
6512        (object_type, name) => {
6513            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6514        }
6515    }
6516}
6517
6518pub fn plan_alter_schema_rename(
6519    scx: &mut StatementContext,
6520    name: UnresolvedSchemaName,
6521    to_schema_name: Ident,
6522    if_exists: bool,
6523) -> Result<Plan, PlanError> {
6524    // Special case for mz_temp: with lazy temporary schema creation, the temp
6525    // schema may not exist yet, but we still need to return the correct error.
6526    // Check the schema name directly against MZ_TEMP_SCHEMA.
6527    let normalized = normalize::unresolved_schema_name(name.clone())?;
6528    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6529        sql_bail!(
6530            "cannot rename schemas in the ambient database: {:?}",
6531            mz_repr::namespaces::MZ_TEMP_SCHEMA
6532        );
6533    }
6534
6535    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6536        let object_type = ObjectType::Schema;
6537        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6538            name: name.to_ast_string_simple(),
6539            object_type,
6540        });
6541        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6542    };
6543
6544    // Make sure the name is unique.
6545    if scx
6546        .resolve_schema_in_database(&db_spec, &to_schema_name)
6547        .is_ok()
6548    {
6549        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6550            to_schema_name.clone().into_string(),
6551        )));
6552    }
6553
6554    // Prevent users from renaming system related schemas.
6555    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6556    if schema.id().is_system() {
6557        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6558    }
6559
6560    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6561        cur_schema_spec: (db_spec, schema_spec),
6562        new_schema_name: to_schema_name.into_string(),
6563    }))
6564}
6565
6566pub fn plan_alter_schema_swap<F>(
6567    scx: &mut StatementContext,
6568    name_a: UnresolvedSchemaName,
6569    name_b: Ident,
6570    gen_temp_suffix: F,
6571) -> Result<Plan, PlanError>
6572where
6573    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6574{
6575    // Special case for mz_temp: with lazy temporary schema creation, the temp
6576    // schema may not exist yet, but we still need to return the correct error.
6577    // Check the schema name directly against MZ_TEMP_SCHEMA.
6578    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6579    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6580    {
6581        sql_bail!("cannot swap schemas that are in the ambient database");
6582    }
6583    // Also check name_b (the target schema name)
6584    let name_b_str = normalize::ident_ref(&name_b);
6585    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6586        sql_bail!("cannot swap schemas that are in the ambient database");
6587    }
6588
6589    let schema_a = scx.resolve_schema(name_a.clone())?;
6590
6591    let db_spec = schema_a.database().clone();
6592    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6593        sql_bail!("cannot swap schemas that are in the ambient database");
6594    };
6595    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6596
6597    // We cannot swap system schemas.
6598    if schema_a.id().is_system() || schema_b.id().is_system() {
6599        bail_never_supported!("swapping a system schema".to_string())
6600    }
6601
6602    // Generate a temporary name we can swap schema_a to.
6603    //
6604    // 'check' returns if the temp schema name would be valid.
6605    let check = |temp_suffix: &str| {
6606        let mut temp_name = ident!("mz_schema_swap_");
6607        temp_name.append_lossy(temp_suffix);
6608        scx.resolve_schema_in_database(&db_spec, &temp_name)
6609            .is_err()
6610    };
6611    let temp_suffix = gen_temp_suffix(&check)?;
6612    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6613
6614    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6615        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6616        schema_a_name: schema_a.name().schema.to_string(),
6617        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6618        schema_b_name: schema_b.name().schema.to_string(),
6619        name_temp,
6620    }))
6621}
6622
6623pub fn plan_alter_item_rename(
6624    scx: &mut StatementContext,
6625    object_type: ObjectType,
6626    name: UnresolvedItemName,
6627    to_item_name: Ident,
6628    if_exists: bool,
6629) -> Result<Plan, PlanError> {
6630    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6631        Ok(r) => r,
6632        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6633        Err(PlanError::MismatchedObjectType {
6634            name,
6635            is_type: ObjectType::MaterializedView,
6636            expected_type: ObjectType::View,
6637        }) => {
6638            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6639        }
6640        e => e?,
6641    };
6642
6643    match resolved {
6644        Some(entry) => {
6645            let full_name = scx.catalog.resolve_full_name(entry.name());
6646            let item_type = entry.item_type();
6647
6648            let proposed_name = QualifiedItemName {
6649                qualifiers: entry.name().qualifiers.clone(),
6650                item: to_item_name.clone().into_string(),
6651            };
6652
6653            // For PostgreSQL compatibility, items and types cannot have
6654            // overlapping names in a variety of situations. See the comment on
6655            // `CatalogItemType::conflicts_with_type` for details.
6656            let conflicting_type_exists;
6657            let conflicting_item_exists;
6658            if item_type == CatalogItemType::Type {
6659                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6660                conflicting_item_exists = scx
6661                    .catalog
6662                    .get_item_by_name(&proposed_name)
6663                    .map(|item| item.item_type().conflicts_with_type())
6664                    .unwrap_or(false);
6665            } else {
6666                conflicting_type_exists = item_type.conflicts_with_type()
6667                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6668                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6669            };
6670            if conflicting_type_exists || conflicting_item_exists {
6671                sql_bail!("catalog item '{}' already exists", to_item_name);
6672            }
6673
6674            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6675                id: entry.id(),
6676                current_full_name: full_name,
6677                to_name: normalize::ident(to_item_name),
6678                object_type,
6679            }))
6680        }
6681        None => {
6682            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6683                name: name.to_ast_string_simple(),
6684                object_type,
6685            });
6686
6687            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6688        }
6689    }
6690}
6691
6692pub fn plan_alter_cluster_rename(
6693    scx: &mut StatementContext,
6694    object_type: ObjectType,
6695    name: Ident,
6696    to_name: Ident,
6697    if_exists: bool,
6698) -> Result<Plan, PlanError> {
6699    match resolve_cluster(scx, &name, if_exists)? {
6700        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6701            id: entry.id(),
6702            name: entry.name().to_string(),
6703            to_name: ident(to_name),
6704        })),
6705        None => {
6706            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6707                name: name.to_ast_string_simple(),
6708                object_type,
6709            });
6710
6711            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6712        }
6713    }
6714}
6715
6716pub fn plan_alter_cluster_swap<F>(
6717    scx: &mut StatementContext,
6718    name_a: Ident,
6719    name_b: Ident,
6720    gen_temp_suffix: F,
6721) -> Result<Plan, PlanError>
6722where
6723    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6724{
6725    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6726    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6727
6728    let check = |temp_suffix: &str| {
6729        let mut temp_name = ident!("mz_schema_swap_");
6730        temp_name.append_lossy(temp_suffix);
6731        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6732            // Temp name does not exist, so we can use it.
6733            Err(CatalogError::UnknownCluster(_)) => true,
6734            // Temp name already exists!
6735            Ok(_) | Err(_) => false,
6736        }
6737    };
6738    let temp_suffix = gen_temp_suffix(&check)?;
6739    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6740
6741    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6742        id_a: cluster_a.id(),
6743        id_b: cluster_b.id(),
6744        name_a: name_a.into_string(),
6745        name_b: name_b.into_string(),
6746        name_temp,
6747    }))
6748}
6749
6750pub fn plan_alter_cluster_replica_rename(
6751    scx: &mut StatementContext,
6752    object_type: ObjectType,
6753    name: QualifiedReplica,
6754    to_item_name: Ident,
6755    if_exists: bool,
6756) -> Result<Plan, PlanError> {
6757    match resolve_cluster_replica(scx, &name, if_exists)? {
6758        Some((cluster, replica)) => {
6759            ensure_cluster_is_not_managed(scx, cluster.id())?;
6760            Ok(Plan::AlterClusterReplicaRename(
6761                AlterClusterReplicaRenamePlan {
6762                    cluster_id: cluster.id(),
6763                    replica_id: replica,
6764                    name: QualifiedReplica {
6765                        cluster: Ident::new(cluster.name())?,
6766                        replica: name.replica,
6767                    },
6768                    to_name: normalize::ident(to_item_name),
6769                },
6770            ))
6771        }
6772        None => {
6773            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6774                name: name.to_ast_string_simple(),
6775                object_type,
6776            });
6777
6778            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6779        }
6780    }
6781}
6782
6783pub fn describe_alter_object_swap(
6784    _: &StatementContext,
6785    _: AlterObjectSwapStatement,
6786) -> Result<StatementDesc, PlanError> {
6787    Ok(StatementDesc::new(None))
6788}
6789
6790pub fn plan_alter_object_swap(
6791    scx: &mut StatementContext,
6792    stmt: AlterObjectSwapStatement,
6793) -> Result<Plan, PlanError> {
6794    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6795
6796    let AlterObjectSwapStatement {
6797        object_type,
6798        name_a,
6799        name_b,
6800    } = stmt;
6801    let object_type = object_type.into();
6802
6803    // We'll try 10 times to generate a temporary suffix.
6804    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6805        let mut attempts = 0;
6806        let name_temp = loop {
6807            attempts += 1;
6808            if attempts > 10 {
6809                tracing::warn!("Unable to generate temp id for swapping");
6810                sql_bail!("unable to swap!");
6811            }
6812
6813            // Call the provided closure to make sure this name is unique!
6814            let short_id = mz_ore::id_gen::temp_id();
6815            if check_fn(&short_id) {
6816                break short_id;
6817            }
6818        };
6819
6820        Ok(name_temp)
6821    };
6822
6823    match (object_type, name_a, name_b) {
6824        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6825            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6826        }
6827        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6828            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6829        }
6830        (object_type, _, _) => Err(PlanError::Unsupported {
6831            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6832            discussion_no: None,
6833        }),
6834    }
6835}
6836
6837pub fn describe_alter_retain_history(
6838    _: &StatementContext,
6839    _: AlterRetainHistoryStatement<Aug>,
6840) -> Result<StatementDesc, PlanError> {
6841    Ok(StatementDesc::new(None))
6842}
6843
6844pub fn plan_alter_retain_history(
6845    scx: &StatementContext,
6846    AlterRetainHistoryStatement {
6847        object_type,
6848        if_exists,
6849        name,
6850        history,
6851    }: AlterRetainHistoryStatement<Aug>,
6852) -> Result<Plan, PlanError> {
6853    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6854}
6855
6856fn alter_retain_history(
6857    scx: &StatementContext,
6858    object_type: ObjectType,
6859    if_exists: bool,
6860    name: UnresolvedObjectName,
6861    history: Option<WithOptionValue<Aug>>,
6862) -> Result<Plan, PlanError> {
6863    let name = match (object_type, name) {
6864        (
6865            // View gets a special error below.
6866            ObjectType::View
6867            | ObjectType::MaterializedView
6868            | ObjectType::Table
6869            | ObjectType::Source
6870            | ObjectType::Index,
6871            UnresolvedObjectName::Item(name),
6872        ) => name,
6873        (object_type, _) => {
6874            sql_bail!("{object_type} does not support RETAIN HISTORY")
6875        }
6876    };
6877    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6878        Some(entry) => {
6879            let full_name = scx.catalog.resolve_full_name(entry.name());
6880            let item_type = entry.item_type();
6881
6882            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6883            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6884                return Err(PlanError::AlterViewOnMaterializedView(
6885                    full_name.to_string(),
6886                ));
6887            } else if object_type == ObjectType::View {
6888                sql_bail!("{object_type} does not support RETAIN HISTORY")
6889            } else if object_type != item_type {
6890                sql_bail!(
6891                    "\"{}\" is a {} not a {}",
6892                    full_name,
6893                    entry.item_type(),
6894                    format!("{object_type}").to_lowercase()
6895                )
6896            }
6897
6898            // Save the original value so we can write it back down in the create_sql catalog item.
6899            let (value, lcw) = match &history {
6900                Some(WithOptionValue::RetainHistoryFor(value)) => {
6901                    let window = OptionalDuration::try_from_value(value.clone())?;
6902                    (Some(value.clone()), window.0)
6903                }
6904                // None is RESET, so use the default CW.
6905                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6906                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6907            };
6908            let window = plan_retain_history(scx, lcw)?;
6909
6910            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6911                id: entry.id(),
6912                value,
6913                window,
6914                object_type,
6915            }))
6916        }
6917        None => {
6918            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6919                name: name.to_ast_string_simple(),
6920                object_type,
6921            });
6922
6923            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6924        }
6925    }
6926}
6927
6928pub fn describe_alter_secret_options(
6929    _: &StatementContext,
6930    _: AlterSecretStatement<Aug>,
6931) -> Result<StatementDesc, PlanError> {
6932    Ok(StatementDesc::new(None))
6933}
6934
6935pub fn plan_alter_secret(
6936    scx: &mut StatementContext,
6937    stmt: AlterSecretStatement<Aug>,
6938) -> Result<Plan, PlanError> {
6939    let AlterSecretStatement {
6940        name,
6941        if_exists,
6942        value,
6943    } = stmt;
6944    let object_type = ObjectType::Secret;
6945    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6946        Some(entry) => entry.id(),
6947        None => {
6948            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6949                name: name.to_string(),
6950                object_type,
6951            });
6952
6953            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6954        }
6955    };
6956
6957    let secret_as = query::plan_secret_as(scx, value)?;
6958
6959    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6960}
6961
6962pub fn describe_alter_connection(
6963    _: &StatementContext,
6964    _: AlterConnectionStatement<Aug>,
6965) -> Result<StatementDesc, PlanError> {
6966    Ok(StatementDesc::new(None))
6967}
6968
6969generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6970
6971pub fn plan_alter_connection(
6972    scx: &StatementContext,
6973    stmt: AlterConnectionStatement<Aug>,
6974) -> Result<Plan, PlanError> {
6975    let AlterConnectionStatement {
6976        name,
6977        if_exists,
6978        actions,
6979        with_options,
6980    } = stmt;
6981    let conn_name = normalize::unresolved_item_name(name)?;
6982    let entry = match scx.catalog.resolve_item(&conn_name) {
6983        Ok(entry) => entry,
6984        Err(_) if if_exists => {
6985            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6986                name: conn_name.to_string(),
6987                object_type: ObjectType::Sink,
6988            });
6989
6990            return Ok(Plan::AlterNoop(AlterNoopPlan {
6991                object_type: ObjectType::Connection,
6992            }));
6993        }
6994        Err(e) => return Err(e.into()),
6995    };
6996
6997    let connection = entry.connection()?;
6998
6999    if actions
7000        .iter()
7001        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7002    {
7003        if actions.len() > 1 {
7004            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7005        }
7006
7007        if !with_options.is_empty() {
7008            sql_bail!(
7009                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7010                with_options
7011                    .iter()
7012                    .map(|o| o.to_ast_string_simple())
7013                    .join(", ")
7014            );
7015        }
7016
7017        if !matches!(connection, Connection::Ssh(_)) {
7018            sql_bail!(
7019                "{} is not an SSH connection",
7020                scx.catalog.resolve_full_name(entry.name())
7021            )
7022        }
7023
7024        return Ok(Plan::AlterConnection(AlterConnectionPlan {
7025            id: entry.id(),
7026            action: crate::plan::AlterConnectionAction::RotateKeys,
7027        }));
7028    }
7029
7030    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7031    if options.validate.is_some() {
7032        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7033    }
7034
7035    let validate = match options.validate {
7036        Some(val) => val,
7037        None => {
7038            scx.catalog
7039                .system_vars()
7040                .enable_default_connection_validation()
7041                && connection.validate_by_default()
7042        }
7043    };
7044
7045    let connection_type = match connection {
7046        Connection::Aws(_) => CreateConnectionType::Aws,
7047        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7048        Connection::Kafka(_) => CreateConnectionType::Kafka,
7049        Connection::Csr(_) => CreateConnectionType::Csr,
7050        Connection::Postgres(_) => CreateConnectionType::Postgres,
7051        Connection::Ssh(_) => CreateConnectionType::Ssh,
7052        Connection::MySql(_) => CreateConnectionType::MySql,
7053        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7054        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7055    };
7056
7057    // Collect all options irrespective of action taken on them.
7058    let specified_options: BTreeSet<_> = actions
7059        .iter()
7060        .map(|action: &AlterConnectionAction<Aug>| match action {
7061            AlterConnectionAction::SetOption(option) => option.name.clone(),
7062            AlterConnectionAction::DropOption(name) => name.clone(),
7063            AlterConnectionAction::RotateKeys => unreachable!(),
7064        })
7065        .collect();
7066
7067    for invalid in INALTERABLE_OPTIONS {
7068        if specified_options.contains(invalid) {
7069            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7070        }
7071    }
7072
7073    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7074
7075    // Partition operations into set and drop
7076    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7077        actions.into_iter().partition_map(|action| match action {
7078            AlterConnectionAction::SetOption(option) => Either::Left(option),
7079            AlterConnectionAction::DropOption(name) => Either::Right(name),
7080            AlterConnectionAction::RotateKeys => unreachable!(),
7081        });
7082
7083    let set_options: BTreeMap<_, _> = set_options_vec
7084        .clone()
7085        .into_iter()
7086        .map(|option| (option.name, option.value))
7087        .collect();
7088
7089    // Type check values + avoid duplicates; we don't want to e.g. let users
7090    // drop and set the same option in the same statement, so treating drops as
7091    // sets here is fine.
7092    let connection_options_extracted =
7093        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7094
7095    let duplicates: Vec<_> = connection_options_extracted
7096        .seen
7097        .intersection(&drop_options)
7098        .collect();
7099
7100    if !duplicates.is_empty() {
7101        sql_bail!(
7102            "cannot both SET and DROP/RESET options {}",
7103            duplicates
7104                .iter()
7105                .map(|option| option.to_string())
7106                .join(", ")
7107        )
7108    }
7109
7110    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7111        let set_options_count = mutually_exclusive_options
7112            .iter()
7113            .filter(|o| set_options.contains_key(o))
7114            .count();
7115        let drop_options_count = mutually_exclusive_options
7116            .iter()
7117            .filter(|o| drop_options.contains(o))
7118            .count();
7119
7120        // Disallow setting _and_ resetting mutually exclusive options
7121        if set_options_count > 0 && drop_options_count > 0 {
7122            sql_bail!(
7123                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7124                connection_type,
7125                mutually_exclusive_options
7126                    .iter()
7127                    .map(|option| option.to_string())
7128                    .join(", ")
7129            )
7130        }
7131
7132        // If any option is either set or dropped, ensure all mutually exclusive
7133        // options are dropped. We do this "behind the scenes", even though we
7134        // disallow users from performing the same action because this is the
7135        // mechanism by which we overwrite values elsewhere in the code.
7136        if set_options_count > 0 || drop_options_count > 0 {
7137            drop_options.extend(mutually_exclusive_options.iter().cloned());
7138        }
7139
7140        // n.b. if mutually exclusive options are set, those will error when we
7141        // try to replan the connection.
7142    }
7143
7144    Ok(Plan::AlterConnection(AlterConnectionPlan {
7145        id: entry.id(),
7146        action: crate::plan::AlterConnectionAction::AlterOptions {
7147            set_options,
7148            drop_options,
7149            validate,
7150        },
7151    }))
7152}
7153
7154pub fn describe_alter_sink(
7155    _: &StatementContext,
7156    _: AlterSinkStatement<Aug>,
7157) -> Result<StatementDesc, PlanError> {
7158    Ok(StatementDesc::new(None))
7159}
7160
7161pub fn plan_alter_sink(
7162    scx: &mut StatementContext,
7163    stmt: AlterSinkStatement<Aug>,
7164) -> Result<Plan, PlanError> {
7165    let AlterSinkStatement {
7166        sink_name,
7167        if_exists,
7168        action,
7169    } = stmt;
7170
7171    let object_type = ObjectType::Sink;
7172    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7173
7174    let Some(item) = item else {
7175        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7176            name: sink_name.to_string(),
7177            object_type,
7178        });
7179
7180        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7181    };
7182    // Always ALTER objects from their latest version.
7183    let item = item.at_version(RelationVersionSelector::Latest);
7184
7185    match action {
7186        AlterSinkAction::ChangeRelation(new_from) => {
7187            // First we reconstruct the original CREATE SINK statement
7188            let create_sql = item.create_sql();
7189            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7190            let [stmt]: [StatementParseResult; 1] = stmts
7191                .try_into()
7192                .expect("create sql of sink was not exactly one statement");
7193            let Statement::CreateSink(stmt) = stmt.ast else {
7194                unreachable!("invalid create SQL for sink item");
7195            };
7196
7197            // Then resolve and swap the resolved from relation to the new one
7198            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7199            stmt.from = new_from;
7200
7201            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7202            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7203                unreachable!("invalid plan for CREATE SINK statement");
7204            };
7205
7206            plan.sink.version += 1;
7207
7208            Ok(Plan::AlterSink(AlterSinkPlan {
7209                item_id: item.id(),
7210                global_id: item.global_id(),
7211                sink: plan.sink,
7212                with_snapshot: plan.with_snapshot,
7213                in_cluster: plan.in_cluster,
7214            }))
7215        }
7216        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7217        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7218    }
7219}
7220
7221pub fn describe_alter_source(
7222    _: &StatementContext,
7223    _: AlterSourceStatement<Aug>,
7224) -> Result<StatementDesc, PlanError> {
7225    // TODO: put the options here, right?
7226    Ok(StatementDesc::new(None))
7227}
7228
7229generate_extracted_config!(
7230    AlterSourceAddSubsourceOption,
7231    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7232    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7233    (Details, String)
7234);
7235
7236pub fn plan_alter_source(
7237    scx: &mut StatementContext,
7238    stmt: AlterSourceStatement<Aug>,
7239) -> Result<Plan, PlanError> {
7240    let AlterSourceStatement {
7241        source_name,
7242        if_exists,
7243        action,
7244    } = stmt;
7245    let object_type = ObjectType::Source;
7246
7247    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7248        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7249            name: source_name.to_string(),
7250            object_type,
7251        });
7252
7253        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7254    }
7255
7256    match action {
7257        AlterSourceAction::SetOptions(options) => {
7258            let mut options = options.into_iter();
7259            let option = options.next().unwrap();
7260            if option.name == CreateSourceOptionName::RetainHistory {
7261                if options.next().is_some() {
7262                    sql_bail!("RETAIN HISTORY must be only option");
7263                }
7264                return alter_retain_history(
7265                    scx,
7266                    object_type,
7267                    if_exists,
7268                    UnresolvedObjectName::Item(source_name),
7269                    option.value,
7270                );
7271            }
7272            // n.b we use this statement in purification in a way that cannot be
7273            // planned directly.
7274            sql_bail!(
7275                "Cannot modify the {} of a SOURCE.",
7276                option.name.to_ast_string_simple()
7277            );
7278        }
7279        AlterSourceAction::ResetOptions(reset) => {
7280            let mut options = reset.into_iter();
7281            let option = options.next().unwrap();
7282            if option == CreateSourceOptionName::RetainHistory {
7283                if options.next().is_some() {
7284                    sql_bail!("RETAIN HISTORY must be only option");
7285                }
7286                return alter_retain_history(
7287                    scx,
7288                    object_type,
7289                    if_exists,
7290                    UnresolvedObjectName::Item(source_name),
7291                    None,
7292                );
7293            }
7294            sql_bail!(
7295                "Cannot modify the {} of a SOURCE.",
7296                option.to_ast_string_simple()
7297            );
7298        }
7299        AlterSourceAction::DropSubsources { .. } => {
7300            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7301        }
7302        AlterSourceAction::AddSubsources { .. } => {
7303            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7304        }
7305        AlterSourceAction::RefreshReferences => {
7306            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7307        }
7308    };
7309}
7310
7311pub fn describe_alter_system_set(
7312    _: &StatementContext,
7313    _: AlterSystemSetStatement,
7314) -> Result<StatementDesc, PlanError> {
7315    Ok(StatementDesc::new(None))
7316}
7317
7318pub fn plan_alter_system_set(
7319    _: &StatementContext,
7320    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7321) -> Result<Plan, PlanError> {
7322    let name = name.to_string();
7323    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7324        name,
7325        value: scl::plan_set_variable_to(to)?,
7326    }))
7327}
7328
7329pub fn describe_alter_system_reset(
7330    _: &StatementContext,
7331    _: AlterSystemResetStatement,
7332) -> Result<StatementDesc, PlanError> {
7333    Ok(StatementDesc::new(None))
7334}
7335
7336pub fn plan_alter_system_reset(
7337    _: &StatementContext,
7338    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7339) -> Result<Plan, PlanError> {
7340    let name = name.to_string();
7341    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7342}
7343
7344pub fn describe_alter_system_reset_all(
7345    _: &StatementContext,
7346    _: AlterSystemResetAllStatement,
7347) -> Result<StatementDesc, PlanError> {
7348    Ok(StatementDesc::new(None))
7349}
7350
7351pub fn plan_alter_system_reset_all(
7352    _: &StatementContext,
7353    _: AlterSystemResetAllStatement,
7354) -> Result<Plan, PlanError> {
7355    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7356}
7357
7358pub fn describe_alter_role(
7359    _: &StatementContext,
7360    _: AlterRoleStatement<Aug>,
7361) -> Result<StatementDesc, PlanError> {
7362    Ok(StatementDesc::new(None))
7363}
7364
7365pub fn plan_alter_role(
7366    scx: &StatementContext,
7367    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7368) -> Result<Plan, PlanError> {
7369    let option = match option {
7370        AlterRoleOption::Attributes(attrs) => {
7371            let attrs = plan_role_attributes(attrs, scx)?;
7372            PlannedAlterRoleOption::Attributes(attrs)
7373        }
7374        AlterRoleOption::Variable(variable) => {
7375            let var = plan_role_variable(variable)?;
7376            PlannedAlterRoleOption::Variable(var)
7377        }
7378    };
7379
7380    Ok(Plan::AlterRole(AlterRolePlan {
7381        id: name.id,
7382        name: name.name,
7383        option,
7384    }))
7385}
7386
7387pub fn describe_alter_table_add_column(
7388    _: &StatementContext,
7389    _: AlterTableAddColumnStatement<Aug>,
7390) -> Result<StatementDesc, PlanError> {
7391    Ok(StatementDesc::new(None))
7392}
7393
7394pub fn plan_alter_table_add_column(
7395    scx: &StatementContext,
7396    stmt: AlterTableAddColumnStatement<Aug>,
7397) -> Result<Plan, PlanError> {
7398    let AlterTableAddColumnStatement {
7399        if_exists,
7400        name,
7401        if_col_not_exist,
7402        column_name,
7403        data_type,
7404    } = stmt;
7405    let object_type = ObjectType::Table;
7406
7407    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7408
7409    let (relation_id, item_name, desc) =
7410        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7411            Some(item) => {
7412                // Always add columns to the latest version of the item.
7413                let item_name = scx.catalog.resolve_full_name(item.name());
7414                let item = item.at_version(RelationVersionSelector::Latest);
7415                let desc = item.relation_desc().expect("table has desc").into_owned();
7416                (item.id(), item_name, desc)
7417            }
7418            None => {
7419                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7420                    name: name.to_ast_string_simple(),
7421                    object_type,
7422                });
7423                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7424            }
7425        };
7426
7427    let column_name = ColumnName::from(column_name.as_str());
7428    if desc.get_by_name(&column_name).is_some() {
7429        if if_col_not_exist {
7430            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7431                column_name: column_name.to_string(),
7432                object_name: item_name.item,
7433            });
7434            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7435        } else {
7436            return Err(PlanError::ColumnAlreadyExists {
7437                column_name,
7438                object_name: item_name.item,
7439            });
7440        }
7441    }
7442
7443    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7444    // TODO(alter_table): Support non-nullable columns with default values.
7445    let column_type = scalar_type.nullable(true);
7446    // "unresolve" our data type so we can later update the persisted create_sql.
7447    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7448
7449    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7450        relation_id,
7451        column_name,
7452        column_type,
7453        raw_sql_type,
7454    }))
7455}
7456
7457pub fn describe_alter_materialized_view_apply_replacement(
7458    _: &StatementContext,
7459    _: AlterMaterializedViewApplyReplacementStatement,
7460) -> Result<StatementDesc, PlanError> {
7461    Ok(StatementDesc::new(None))
7462}
7463
7464pub fn plan_alter_materialized_view_apply_replacement(
7465    scx: &StatementContext,
7466    stmt: AlterMaterializedViewApplyReplacementStatement,
7467) -> Result<Plan, PlanError> {
7468    let AlterMaterializedViewApplyReplacementStatement {
7469        if_exists,
7470        name,
7471        replacement_name,
7472    } = stmt;
7473
7474    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7475
7476    let object_type = ObjectType::MaterializedView;
7477    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7478        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7479            name: name.to_ast_string_simple(),
7480            object_type,
7481        });
7482        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7483    };
7484
7485    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7486        .expect("if_exists not set");
7487
7488    if replacement.replacement_target() != Some(mv.id()) {
7489        return Err(PlanError::InvalidReplacement {
7490            item_type: mv.item_type(),
7491            item_name: scx.catalog.minimal_qualification(mv.name()),
7492            replacement_type: replacement.item_type(),
7493            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7494        });
7495    }
7496
7497    Ok(Plan::AlterMaterializedViewApplyReplacement(
7498        AlterMaterializedViewApplyReplacementPlan {
7499            id: mv.id(),
7500            replacement_id: replacement.id(),
7501        },
7502    ))
7503}
7504
7505pub fn describe_comment(
7506    _: &StatementContext,
7507    _: CommentStatement<Aug>,
7508) -> Result<StatementDesc, PlanError> {
7509    Ok(StatementDesc::new(None))
7510}
7511
7512pub fn plan_comment(
7513    scx: &mut StatementContext,
7514    stmt: CommentStatement<Aug>,
7515) -> Result<Plan, PlanError> {
7516    const MAX_COMMENT_LENGTH: usize = 1024;
7517
7518    let CommentStatement { object, comment } = stmt;
7519
7520    // TODO(parkmycar): Make max comment length configurable.
7521    if let Some(c) = &comment {
7522        if c.len() > 1024 {
7523            return Err(PlanError::CommentTooLong {
7524                length: c.len(),
7525                max_size: MAX_COMMENT_LENGTH,
7526            });
7527        }
7528    }
7529
7530    let (object_id, column_pos) = match &object {
7531        com_ty @ CommentObjectType::Table { name }
7532        | com_ty @ CommentObjectType::View { name }
7533        | com_ty @ CommentObjectType::MaterializedView { name }
7534        | com_ty @ CommentObjectType::Index { name }
7535        | com_ty @ CommentObjectType::Func { name }
7536        | com_ty @ CommentObjectType::Connection { name }
7537        | com_ty @ CommentObjectType::Source { name }
7538        | com_ty @ CommentObjectType::Sink { name }
7539        | com_ty @ CommentObjectType::Secret { name }
7540        | com_ty @ CommentObjectType::ContinualTask { name } => {
7541            let item = scx.get_item_by_resolved_name(name)?;
7542            match (com_ty, item.item_type()) {
7543                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7544                    (CommentObjectId::Table(item.id()), None)
7545                }
7546                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7547                    (CommentObjectId::View(item.id()), None)
7548                }
7549                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7550                    (CommentObjectId::MaterializedView(item.id()), None)
7551                }
7552                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7553                    (CommentObjectId::Index(item.id()), None)
7554                }
7555                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7556                    (CommentObjectId::Func(item.id()), None)
7557                }
7558                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7559                    (CommentObjectId::Connection(item.id()), None)
7560                }
7561                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7562                    (CommentObjectId::Source(item.id()), None)
7563                }
7564                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7565                    (CommentObjectId::Sink(item.id()), None)
7566                }
7567                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7568                    (CommentObjectId::Secret(item.id()), None)
7569                }
7570                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7571                    (CommentObjectId::ContinualTask(item.id()), None)
7572                }
7573                (com_ty, cat_ty) => {
7574                    let expected_type = match com_ty {
7575                        CommentObjectType::Table { .. } => ObjectType::Table,
7576                        CommentObjectType::View { .. } => ObjectType::View,
7577                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7578                        CommentObjectType::Index { .. } => ObjectType::Index,
7579                        CommentObjectType::Func { .. } => ObjectType::Func,
7580                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7581                        CommentObjectType::Source { .. } => ObjectType::Source,
7582                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7583                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7584                        _ => unreachable!("these are the only types we match on"),
7585                    };
7586
7587                    return Err(PlanError::InvalidObjectType {
7588                        expected_type: SystemObjectType::Object(expected_type),
7589                        actual_type: SystemObjectType::Object(cat_ty.into()),
7590                        object_name: item.name().item.clone(),
7591                    });
7592                }
7593            }
7594        }
7595        CommentObjectType::Type { ty } => match ty {
7596            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7597                sql_bail!("cannot comment on anonymous list or map type");
7598            }
7599            ResolvedDataType::Named { id, modifiers, .. } => {
7600                if !modifiers.is_empty() {
7601                    sql_bail!("cannot comment on type with modifiers");
7602                }
7603                (CommentObjectId::Type(*id), None)
7604            }
7605            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7606        },
7607        CommentObjectType::Column { name } => {
7608            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7609            match item.item_type() {
7610                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7611                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7612                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7613                CatalogItemType::MaterializedView => {
7614                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7615                }
7616                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7617                r => {
7618                    return Err(PlanError::Unsupported {
7619                        feature: format!("Specifying comments on a column of {r}"),
7620                        discussion_no: None,
7621                    });
7622                }
7623            }
7624        }
7625        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7626        CommentObjectType::Database { name } => {
7627            (CommentObjectId::Database(*name.database_id()), None)
7628        }
7629        CommentObjectType::Schema { name } => {
7630            // Temporary schemas cannot have comments - they are connection-specific
7631            // and transient. With lazy temporary schema creation, the temp schema
7632            // may not exist yet, but we still need to return the correct error.
7633            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7634                sql_bail!(
7635                    "cannot comment on schema {} because it is a temporary schema",
7636                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7637                );
7638            }
7639            (
7640                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7641                None,
7642            )
7643        }
7644        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7645        CommentObjectType::ClusterReplica { name } => {
7646            let replica = scx.catalog.resolve_cluster_replica(name)?;
7647            (
7648                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7649                None,
7650            )
7651        }
7652        CommentObjectType::NetworkPolicy { name } => {
7653            (CommentObjectId::NetworkPolicy(name.id), None)
7654        }
7655    };
7656
7657    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7658    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7659    // it's the easiest place to raise an error.
7660    //
7661    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7662    if let Some(p) = column_pos {
7663        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7664            max_num_columns: MAX_NUM_COLUMNS,
7665            req_num_columns: p,
7666        })?;
7667    }
7668
7669    Ok(Plan::Comment(CommentPlan {
7670        object_id,
7671        sub_component: column_pos,
7672        comment,
7673    }))
7674}
7675
7676pub(crate) fn resolve_cluster<'a>(
7677    scx: &'a StatementContext,
7678    name: &'a Ident,
7679    if_exists: bool,
7680) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7681    match scx.resolve_cluster(Some(name)) {
7682        Ok(cluster) => Ok(Some(cluster)),
7683        Err(_) if if_exists => Ok(None),
7684        Err(e) => Err(e),
7685    }
7686}
7687
7688pub(crate) fn resolve_cluster_replica<'a>(
7689    scx: &'a StatementContext,
7690    name: &QualifiedReplica,
7691    if_exists: bool,
7692) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7693    match scx.resolve_cluster(Some(&name.cluster)) {
7694        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7695            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7696            None if if_exists => Ok(None),
7697            None => Err(sql_err!(
7698                "CLUSTER {} has no CLUSTER REPLICA named {}",
7699                cluster.name(),
7700                name.replica.as_str().quoted(),
7701            )),
7702        },
7703        Err(_) if if_exists => Ok(None),
7704        Err(e) => Err(e),
7705    }
7706}
7707
7708pub(crate) fn resolve_database<'a>(
7709    scx: &'a StatementContext,
7710    name: &'a UnresolvedDatabaseName,
7711    if_exists: bool,
7712) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7713    match scx.resolve_database(name) {
7714        Ok(database) => Ok(Some(database)),
7715        Err(_) if if_exists => Ok(None),
7716        Err(e) => Err(e),
7717    }
7718}
7719
7720pub(crate) fn resolve_schema<'a>(
7721    scx: &'a StatementContext,
7722    name: UnresolvedSchemaName,
7723    if_exists: bool,
7724) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7725    match scx.resolve_schema(name) {
7726        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7727        Err(_) if if_exists => Ok(None),
7728        Err(e) => Err(e),
7729    }
7730}
7731
7732pub(crate) fn resolve_network_policy<'a>(
7733    scx: &'a StatementContext,
7734    name: Ident,
7735    if_exists: bool,
7736) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7737    match scx.catalog.resolve_network_policy(&name.to_string()) {
7738        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7739            id: policy.id(),
7740            name: policy.name().to_string(),
7741        })),
7742        Err(_) if if_exists => Ok(None),
7743        Err(e) => Err(e.into()),
7744    }
7745}
7746
7747pub(crate) fn resolve_item_or_type<'a>(
7748    scx: &'a StatementContext,
7749    object_type: ObjectType,
7750    name: UnresolvedItemName,
7751    if_exists: bool,
7752) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7753    let name = normalize::unresolved_item_name(name)?;
7754    let catalog_item = match object_type {
7755        ObjectType::Type => scx.catalog.resolve_type(&name),
7756        _ => scx.catalog.resolve_item(&name),
7757    };
7758
7759    match catalog_item {
7760        Ok(item) => {
7761            let is_type = ObjectType::from(item.item_type());
7762            if object_type == is_type {
7763                Ok(Some(item))
7764            } else {
7765                Err(PlanError::MismatchedObjectType {
7766                    name: scx.catalog.minimal_qualification(item.name()),
7767                    is_type,
7768                    expected_type: object_type,
7769                })
7770            }
7771        }
7772        Err(_) if if_exists => Ok(None),
7773        Err(e) => Err(e.into()),
7774    }
7775}
7776
7777/// Returns an error if the given cluster is a managed cluster
7778fn ensure_cluster_is_not_managed(
7779    scx: &StatementContext,
7780    cluster_id: ClusterId,
7781) -> Result<(), PlanError> {
7782    let cluster = scx.catalog.get_cluster(cluster_id);
7783    if cluster.is_managed() {
7784        Err(PlanError::ManagedCluster {
7785            cluster_name: cluster.name().to_string(),
7786        })
7787    } else {
7788        Ok(())
7789    }
7790}