Skip to main content

mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59    CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60    CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61    CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141    scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154    AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155    ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156    ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157    CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158    CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165    WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184/// Given a relation desc and a column list, checks that:
185/// - the column list is a prefix of the desc;
186/// - all the listed columns are types that have meaningful Persist-level ordering.
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188    if partition_by.len() > desc.len() {
189        tracing::error!(
190            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191            desc.len(),
192            partition_by.len()
193        );
194        partition_by.truncate(desc.len());
195    }
196
197    let desc_prefix = desc.iter().take(partition_by.len());
198    for (idx, ((desc_name, desc_type), partition_name)) in
199        desc_prefix.zip_eq(partition_by).enumerate()
200    {
201        let partition_name = normalize::column_name(partition_name);
202        if *desc_name != partition_name {
203            sql_bail!(
204                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205            );
206        }
207        if !preserves_order(&desc_type.scalar_type) {
208            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209        }
210    }
211    Ok(())
212}
213
214pub fn describe_create_database(
215    _: &StatementContext,
216    _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218    Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222    _: &StatementContext,
223    CreateDatabaseStatement {
224        name,
225        if_not_exists,
226    }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228    Ok(Plan::CreateDatabase(CreateDatabasePlan {
229        name: normalize::ident(name.0),
230        if_not_exists,
231    }))
232}
233
234pub fn describe_create_schema(
235    _: &StatementContext,
236    _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238    Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242    scx: &StatementContext,
243    CreateSchemaStatement {
244        mut name,
245        if_not_exists,
246    }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248    if name.0.len() > 2 {
249        sql_bail!("schema name {} has more than two components", name);
250    }
251    let schema_name = normalize::ident(
252        name.0
253            .pop()
254            .expect("names always have at least one component"),
255    );
256    let database_spec = match name.0.pop() {
257        None => match scx.catalog.active_database() {
258            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259            None => sql_bail!("no database specified and no active database"),
260        },
261        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263            Err(_) => sql_bail!("invalid database {}", n.as_str()),
264        },
265    };
266    Ok(Plan::CreateSchema(CreateSchemaPlan {
267        database_spec,
268        schema_name,
269        if_not_exists,
270    }))
271}
272
273pub fn describe_create_table(
274    _: &StatementContext,
275    _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277    Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281    scx: &StatementContext,
282    stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284    let CreateTableStatement {
285        name,
286        columns,
287        constraints,
288        if_not_exists,
289        temporary,
290        with_options,
291    } = &stmt;
292
293    let names: Vec<_> = columns
294        .iter()
295        .filter(|c| {
296            // This set of `names` is used to create the initial RelationDesc.
297            // Columns that have been added at later versions of the table will
298            // get added further below.
299            let is_versioned = c
300                .options
301                .iter()
302                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303            !is_versioned
304        })
305        .map(|c| normalize::column_name(c.name.clone()))
306        .collect();
307
308    if let Some(dup) = names.iter().duplicates().next() {
309        sql_bail!("column {} specified more than once", dup.quoted());
310    }
311
312    // Build initial relation type that handles declared data types
313    // and NOT NULL constraints.
314    let mut column_types = Vec::with_capacity(columns.len());
315    let mut defaults = Vec::with_capacity(columns.len());
316    let mut changes = BTreeMap::new();
317    let mut keys = Vec::new();
318
319    for (i, c) in columns.into_iter().enumerate() {
320        let aug_data_type = &c.data_type;
321        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322        let mut nullable = true;
323        let mut default = Expr::null();
324        let mut versioned = false;
325        for option in &c.options {
326            match &option.option {
327                ColumnOption::NotNull => nullable = false,
328                ColumnOption::Default(expr) => {
329                    // Ensure expression can be planned and yields the correct
330                    // type.
331                    let mut expr = expr.clone();
332                    transform_ast::transform(scx, &mut expr)?;
333                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
334                    default = expr.clone();
335                }
336                ColumnOption::Unique { is_primary } => {
337                    keys.push(vec![i]);
338                    if *is_primary {
339                        nullable = false;
340                    }
341                }
342                ColumnOption::Versioned { action, version } => {
343                    let version = RelationVersion::from(*version);
344                    versioned = true;
345
346                    let name = normalize::column_name(c.name.clone());
347                    let typ = ty.clone().nullable(nullable);
348
349                    changes.insert(version, (action.clone(), name, typ));
350                }
351                other => {
352                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353                }
354            }
355        }
356        // TODO(alter_table): This assumes all versioned columns are at the
357        // end. This will no longer be true when we support dropping columns.
358        if !versioned {
359            column_types.push(ty.nullable(nullable));
360        }
361        defaults.push(default);
362    }
363
364    let mut seen_primary = false;
365    'c: for constraint in constraints {
366        match constraint {
367            TableConstraint::Unique {
368                name: _,
369                columns,
370                is_primary,
371                nulls_not_distinct,
372            } => {
373                if seen_primary && *is_primary {
374                    sql_bail!(
375                        "multiple primary keys for table {} are not allowed",
376                        name.to_ast_string_stable()
377                    );
378                }
379                seen_primary = *is_primary || seen_primary;
380
381                let mut key = vec![];
382                for column in columns {
383                    let column = normalize::column_name(column.clone());
384                    match names.iter().position(|name| *name == column) {
385                        None => sql_bail!("unknown column in constraint: {}", column),
386                        Some(i) => {
387                            let nullable = &mut column_types[i].nullable;
388                            if *is_primary {
389                                if *nulls_not_distinct {
390                                    sql_bail!(
391                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392                                    );
393                                }
394
395                                *nullable = false;
396                            } else if !(*nulls_not_distinct || !*nullable) {
397                                // Non-primary key unique constraints are only keys if all of their
398                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
399                                break 'c;
400                            }
401
402                            key.push(i);
403                        }
404                    }
405                }
406
407                if *is_primary {
408                    keys.insert(0, key);
409                } else {
410                    keys.push(key);
411                }
412            }
413            TableConstraint::ForeignKey { .. } => {
414                // Foreign key constraints are not presently enforced. We allow
415                // them with feature flags for sqllogictest's sake.
416                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417            }
418            TableConstraint::Check { .. } => {
419                // Check constraints are not presently enforced. We allow them
420                // with feature flags for sqllogictest's sake.
421                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422            }
423        }
424    }
425
426    if !keys.is_empty() {
427        // Unique constraints are not presently enforced. We allow them with feature flags for
428        // sqllogictest's sake.
429        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430    }
431
432    let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434    let temporary = *temporary;
435    let name = if temporary {
436        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437    } else {
438        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    };
440
441    // Check for an object in the catalog with this same name
442    let full_name = scx.catalog.resolve_full_name(&name);
443    let partial_name = PartialItemName::from(full_name.clone());
444    // For PostgreSQL compatibility, we need to prevent creating tables when
445    // there is an existing object *or* type of the same name.
446    if let (false, Ok(item)) = (
447        if_not_exists,
448        scx.catalog.resolve_item_or_type(&partial_name),
449    ) {
450        return Err(PlanError::ItemAlreadyExists {
451            name: full_name.to_string(),
452            item_type: item.item_type(),
453        });
454    }
455
456    let desc = RelationDesc::new(typ, names);
457    let mut desc = VersionedRelationDesc::new(desc);
458    for (version, (_action, name, typ)) in changes.into_iter() {
459        let new_version = desc.add_column(name, typ);
460        if version != new_version {
461            return Err(PlanError::InvalidTable {
462                name: full_name.item,
463            });
464        }
465    }
466
467    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469    // Table options should only consider the original columns, since those
470    // were the only ones in scope when the table was created.
471    //
472    // TODO(alter_table): Will need to reconsider this when we support ALTERing
473    // the PARTITION BY columns.
474    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477    let compaction_window = options.iter().find_map(|o| {
478        #[allow(irrefutable_let_patterns)]
479        if let crate::plan::TableOption::RetainHistory(lcw) = o {
480            Some(lcw.clone())
481        } else {
482            None
483        }
484    });
485
486    let table = Table {
487        create_sql,
488        desc,
489        temporary,
490        compaction_window,
491        data_source: TableDataSource::TableWrites { defaults },
492    };
493    Ok(Plan::CreateTable(CreateTablePlan {
494        name,
495        table,
496        if_not_exists: *if_not_exists,
497    }))
498}
499
500pub fn describe_create_table_from_source(
501    _: &StatementContext,
502    _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504    Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508    _: &StatementContext,
509    _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511    Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515    _: &StatementContext,
516    _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518    Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522    _: &StatementContext,
523    _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525    Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529    CreateSourceOption,
530    (TimestampInterval, Duration),
531    (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535    PgConfigOption,
536    (Details, String),
537    (Publication, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543    MySqlConfigOption,
544    (Details, String),
545    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550    SqlServerConfigOption,
551    (Details, String),
552    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557    scx: &StatementContext,
558    mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560    if stmt.is_table {
561        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562    }
563
564    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
565    // we plan to normalize when we canonicalize the create statement.
566    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567    let enable_multi_replica_sources =
568        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569    if !enable_multi_replica_sources {
570        if in_cluster.replica_ids().len() > 1 {
571            sql_bail!("cannot create webhook source in cluster with more than one replica")
572        }
573    }
574    let create_sql =
575        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577    let CreateWebhookSourceStatement {
578        name,
579        if_not_exists,
580        body_format,
581        include_headers,
582        validate_using,
583        is_table,
584        // We resolved `in_cluster` above, so we want to ignore it here.
585        in_cluster: _,
586    } = stmt;
587
588    let validate_using = validate_using
589        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590        .transpose()?;
591    if let Some(WebhookValidation { expression, .. }) = &validate_using {
592        // If the validation expression doesn't reference any part of the request, then we should
593        // return an error because it's almost definitely wrong.
594        if !expression.contains_column() {
595            return Err(PlanError::WebhookValidationDoesNotUseColumns);
596        }
597        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
598        // allow calls to `now()` because some webhook providers recommend rejecting requests that
599        // are older than a certain threshold.
600        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601            return Err(PlanError::WebhookValidationNonDeterministic);
602        }
603    }
604
605    let body_format = match body_format {
606        Format::Bytes => WebhookBodyFormat::Bytes,
607        Format::Json { array } => WebhookBodyFormat::Json { array },
608        Format::Text => WebhookBodyFormat::Text,
609        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
610        ty => {
611            return Err(PlanError::Unsupported {
612                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613                discussion_no: None,
614            });
615        }
616    };
617
618    let mut column_ty = vec![
619        // Always include the body of the request as the first column.
620        SqlColumnType {
621            scalar_type: SqlScalarType::from(body_format),
622            nullable: false,
623        },
624    ];
625    let mut column_names = vec!["body".to_string()];
626
627    let mut headers = WebhookHeaders::default();
628
629    // Include a `headers` column, possibly filtered.
630    if let Some(filters) = include_headers.column {
631        column_ty.push(SqlColumnType {
632            scalar_type: SqlScalarType::Map {
633                value_type: Box::new(SqlScalarType::String),
634                custom_id: None,
635            },
636            nullable: false,
637        });
638        column_names.push("headers".to_string());
639
640        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641            filters.into_iter().partition_map(|filter| {
642                if filter.block {
643                    itertools::Either::Right(filter.header_name)
644                } else {
645                    itertools::Either::Left(filter.header_name)
646                }
647            });
648        headers.header_column = Some(WebhookHeaderFilters { allow, block });
649    }
650
651    // Map headers to specific columns.
652    for header in include_headers.mappings {
653        let scalar_type = header
654            .use_bytes
655            .then_some(SqlScalarType::Bytes)
656            .unwrap_or(SqlScalarType::String);
657        column_ty.push(SqlColumnType {
658            scalar_type,
659            nullable: true,
660        });
661        column_names.push(header.column_name.into_string());
662
663        let column_idx = column_ty.len() - 1;
664        // Double check we're consistent with column names.
665        assert_eq!(
666            column_idx,
667            column_names.len() - 1,
668            "header column names and types don't match"
669        );
670        headers
671            .mapped_headers
672            .insert(column_idx, (header.header_name, header.use_bytes));
673    }
674
675    // Validate our columns.
676    let mut unique_check = HashSet::with_capacity(column_names.len());
677    for name in &column_names {
678        if !unique_check.insert(name) {
679            return Err(PlanError::AmbiguousColumn(name.clone().into()));
680        }
681    }
682    if column_names.len() > MAX_NUM_COLUMNS {
683        return Err(PlanError::TooManyColumns {
684            max_num_columns: MAX_NUM_COLUMNS,
685            req_num_columns: column_names.len(),
686        });
687    }
688
689    let typ = SqlRelationType::new(column_ty);
690    let desc = RelationDesc::new(typ, column_names);
691
692    // Check for an object in the catalog with this same name
693    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694    let full_name = scx.catalog.resolve_full_name(&name);
695    let partial_name = PartialItemName::from(full_name.clone());
696    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697        return Err(PlanError::ItemAlreadyExists {
698            name: full_name.to_string(),
699            item_type: item.item_type(),
700        });
701    }
702
703    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
704    // such, we always use a default of EpochMilliseconds.
705    let timeline = Timeline::EpochMilliseconds;
706
707    let plan = if is_table {
708        let data_source = DataSourceDesc::Webhook {
709            validate_using,
710            body_format,
711            headers,
712            cluster_id: Some(in_cluster.id()),
713        };
714        let data_source = TableDataSource::DataSource {
715            desc: data_source,
716            timeline,
717        };
718        Plan::CreateTable(CreateTablePlan {
719            name,
720            if_not_exists,
721            table: Table {
722                create_sql,
723                desc: VersionedRelationDesc::new(desc),
724                temporary: false,
725                compaction_window: None,
726                data_source,
727            },
728        })
729    } else {
730        let data_source = DataSourceDesc::Webhook {
731            validate_using,
732            body_format,
733            headers,
734            // Important: The cluster is set at the `Source` level.
735            cluster_id: None,
736        };
737        Plan::CreateSource(CreateSourcePlan {
738            name,
739            source: Source {
740                create_sql,
741                data_source,
742                desc,
743                compaction_window: None,
744            },
745            if_not_exists,
746            timeline,
747            in_cluster: Some(in_cluster.id()),
748        })
749    };
750
751    Ok(plan)
752}
753
754pub fn plan_create_source(
755    scx: &StatementContext,
756    mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758    let CreateSourceStatement {
759        name,
760        in_cluster: _,
761        col_names,
762        connection: source_connection,
763        envelope,
764        if_not_exists,
765        format,
766        key_constraint,
767        include_metadata,
768        with_options,
769        external_references: referenced_subsources,
770        progress_subsource,
771    } = &stmt;
772
773    mz_ore::soft_assert_or_log!(
774        referenced_subsources.is_none(),
775        "referenced subsources must be cleared in purification"
776    );
777
778    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779        && scx.catalog.system_vars().force_source_table_syntax();
780
781    // If the new source table syntax is forced all the options related to the primary
782    // source output should be un-set.
783    if force_source_table_syntax {
784        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785            Err(PlanError::UseTablesForSources(
786                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787            ))?;
788        }
789    }
790
791    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794        && include_metadata
795            .iter()
796            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797    {
798        // TODO(guswynn): should this be `bail_unsupported!`?
799        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800    }
801    if !matches!(
802        source_connection,
803        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804    ) && !include_metadata.is_empty()
805    {
806        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807    }
808
809    if !include_metadata.is_empty()
810        && !matches!(
811            envelope,
812            ast::SourceEnvelope::Upsert { .. }
813                | ast::SourceEnvelope::None
814                | ast::SourceEnvelope::Debezium
815        )
816    {
817        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818    }
819
820    let external_connection =
821        plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823    let CreateSourceOptionExtracted {
824        timestamp_interval,
825        retain_history,
826        seen: _,
827    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829    let metadata_columns_desc = match external_connection {
830        GenericSourceConnection::Kafka(KafkaSourceConnection {
831            ref metadata_columns,
832            ..
833        }) => kafka_metadata_columns_desc(metadata_columns),
834        _ => vec![],
835    };
836
837    // Generate the relation description for the primary export of the source.
838    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839        scx,
840        &envelope,
841        format,
842        Some(external_connection.default_key_desc()),
843        external_connection.default_value_desc(),
844        include_metadata,
845        metadata_columns_desc,
846        &external_connection,
847    )?;
848    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850    let names: Vec<_> = desc.iter_names().cloned().collect();
851    if let Some(dup) = names.iter().duplicates().next() {
852        sql_bail!("column {} specified more than once", dup.quoted());
853    }
854
855    // Apply user-specified key constraint
856    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857        // Don't remove this without addressing
858        // https://github.com/MaterializeInc/database-issues/issues/4371.
859        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861        let key_columns = columns
862            .into_iter()
863            .map(normalize::column_name)
864            .collect::<Vec<_>>();
865
866        let mut uniq = BTreeSet::new();
867        for col in key_columns.iter() {
868            if !uniq.insert(col) {
869                sql_bail!("Repeated column name in source key constraint: {}", col);
870            }
871        }
872
873        let key_indices = key_columns
874            .iter()
875            .map(|col| {
876                let name_idx = desc
877                    .get_by_name(col)
878                    .map(|(idx, _type)| idx)
879                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880                if desc.get_unambiguous_name(name_idx).is_none() {
881                    sql_bail!("Ambiguous column in source key constraint: {}", col);
882                }
883                Ok(name_idx)
884            })
885            .collect::<Result<Vec<_>, _>>()?;
886
887        if !desc.typ().keys.is_empty() {
888            return Err(key_constraint_err(&desc, &key_columns));
889        } else {
890            desc = desc.with_key(key_indices);
891        }
892    }
893
894    let timestamp_interval = match timestamp_interval {
895        Some(duration) => {
896            let min = scx.catalog.system_vars().min_timestamp_interval();
897            let max = scx.catalog.system_vars().max_timestamp_interval();
898            if duration < min || duration > max {
899                return Err(PlanError::InvalidTimestampInterval {
900                    min,
901                    max,
902                    requested: duration,
903                });
904            }
905            duration
906        }
907        None => scx.catalog.config().timestamp_interval,
908    };
909
910    let (desc, data_source) = match progress_subsource {
911        Some(name) => {
912            let DeferredItemName::Named(name) = name else {
913                sql_bail!("[internal error] progress subsource must be named during purification");
914            };
915            let ResolvedItemName::Item { id, .. } = name else {
916                sql_bail!("[internal error] invalid target id");
917            };
918
919            let details = match external_connection {
920                GenericSourceConnection::Kafka(ref c) => {
921                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
922                        metadata_columns: c.metadata_columns.clone(),
923                    })
924                }
925                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926                    LoadGenerator::Auction
927                    | LoadGenerator::Marketing
928                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929                    LoadGenerator::Counter { .. }
930                    | LoadGenerator::Clock
931                    | LoadGenerator::Datums
932                    | LoadGenerator::KeyValue(_) => {
933                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934                            output: LoadGeneratorOutput::Default,
935                        })
936                    }
937                },
938                GenericSourceConnection::Postgres(_)
939                | GenericSourceConnection::MySql(_)
940                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941            };
942
943            let data_source = DataSourceDesc::OldSyntaxIngestion {
944                desc: SourceDesc {
945                    connection: external_connection,
946                    timestamp_interval,
947                },
948                progress_subsource: *id,
949                data_config: SourceExportDataConfig {
950                    encoding,
951                    envelope: envelope.clone(),
952                },
953                details,
954            };
955            (desc, data_source)
956        }
957        None => {
958            let desc = external_connection.timestamp_desc();
959            let data_source = DataSourceDesc::Ingestion(SourceDesc {
960                connection: external_connection,
961                timestamp_interval,
962            });
963            (desc, data_source)
964        }
965    };
966
967    let if_not_exists = *if_not_exists;
968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970    // Check for an object in the catalog with this same name
971    let full_name = scx.catalog.resolve_full_name(&name);
972    let partial_name = PartialItemName::from(full_name.clone());
973    // For PostgreSQL compatibility, we need to prevent creating sources when
974    // there is an existing object *or* type of the same name.
975    if let (false, Ok(item)) = (
976        if_not_exists,
977        scx.catalog.resolve_item_or_type(&partial_name),
978    ) {
979        return Err(PlanError::ItemAlreadyExists {
980            name: full_name.to_string(),
981            item_type: item.item_type(),
982        });
983    }
984
985    // We will rewrite the cluster if one is not provided, so we must use the
986    // `in_cluster` value we plan to normalize when we canonicalize the create
987    // statement.
988    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992    // Determine a default timeline for the source.
993    let timeline = match envelope {
994        SourceEnvelope::CdcV2 => {
995            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996        }
997        _ => Timeline::EpochMilliseconds,
998    };
999
1000    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002    let source = Source {
1003        create_sql,
1004        data_source,
1005        desc,
1006        compaction_window,
1007    };
1008
1009    Ok(Plan::CreateSource(CreateSourcePlan {
1010        name,
1011        source,
1012        if_not_exists,
1013        timeline,
1014        in_cluster: Some(in_cluster.id()),
1015    }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019    scx: &StatementContext<'_>,
1020    source_connection: &CreateSourceConnection<Aug>,
1021    include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023    Ok(match source_connection {
1024        CreateSourceConnection::Kafka {
1025            connection,
1026            options,
1027        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028            scx,
1029            connection,
1030            options,
1031            include_metadata,
1032        )?),
1033        CreateSourceConnection::Postgres {
1034            connection,
1035            options,
1036        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037            scx, connection, options,
1038        )?),
1039        CreateSourceConnection::SqlServer {
1040            connection,
1041            options,
1042        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043            scx, connection, options,
1044        )?),
1045        CreateSourceConnection::MySql {
1046            connection,
1047            options,
1048        } => {
1049            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050        }
1051        CreateSourceConnection::LoadGenerator { generator, options } => {
1052            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053                scx,
1054                generator,
1055                options,
1056                include_metadata,
1057            )?)
1058        }
1059    })
1060}
1061
1062fn plan_load_generator_source_connection(
1063    scx: &StatementContext<'_>,
1064    generator: &ast::LoadGenerator,
1065    options: &Vec<LoadGeneratorOption<Aug>>,
1066    include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068    let load_generator =
1069        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070    let LoadGeneratorOptionExtracted {
1071        tick_interval,
1072        as_of,
1073        up_to,
1074        ..
1075    } = options.clone().try_into()?;
1076    let tick_micros = match tick_interval {
1077        Some(interval) => Some(interval.as_micros().try_into()?),
1078        None => None,
1079    };
1080    if up_to < as_of {
1081        sql_bail!("UP TO cannot be less than AS OF");
1082    }
1083    Ok(LoadGeneratorSourceConnection {
1084        load_generator,
1085        tick_micros,
1086        as_of,
1087        up_to,
1088    })
1089}
1090
1091fn plan_mysql_source_connection(
1092    scx: &StatementContext<'_>,
1093    connection: &ResolvedItemName,
1094    options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096    let connection_item = scx.get_item_by_resolved_name(connection)?;
1097    match connection_item.connection()? {
1098        Connection::MySql(connection) => connection,
1099        _ => sql_bail!(
1100            "{} is not a MySQL connection",
1101            scx.catalog.resolve_full_name(connection_item.name())
1102        ),
1103    };
1104    let MySqlConfigOptionExtracted {
1105        details,
1106        // text/exclude columns are already part of the source-exports and are only included
1107        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1108        // be removed once we drop support for implicitly created subsources.
1109        text_columns: _,
1110        exclude_columns: _,
1111        seen: _,
1112    } = options.clone().try_into()?;
1113    let details = details
1114        .as_ref()
1115        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119    Ok(MySqlSourceConnection {
1120        connection: connection_item.id(),
1121        connection_id: connection_item.id(),
1122        details,
1123    })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127    scx: &StatementContext<'_>,
1128    connection: &ResolvedItemName,
1129    options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131    let connection_item = scx.get_item_by_resolved_name(connection)?;
1132    match connection_item.connection()? {
1133        Connection::SqlServer(connection) => connection,
1134        _ => sql_bail!(
1135            "{} is not a SQL Server connection",
1136            scx.catalog.resolve_full_name(connection_item.name())
1137        ),
1138    };
1139    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140    let details = details
1141        .as_ref()
1142        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143    let extras = hex::decode(details)
1144        .map_err(|e| sql_err!("{e}"))
1145        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147    Ok(SqlServerSourceConnection {
1148        connection_id: connection_item.id(),
1149        connection: connection_item.id(),
1150        extras,
1151    })
1152}
1153
1154fn plan_postgres_source_connection(
1155    scx: &StatementContext<'_>,
1156    connection: &ResolvedItemName,
1157    options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159    let connection_item = scx.get_item_by_resolved_name(connection)?;
1160    let PgConfigOptionExtracted {
1161        details,
1162        publication,
1163        // text columns are already part of the source-exports and are only included
1164        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1165        // be removed once we drop support for implicitly created subsources.
1166        text_columns: _,
1167        // exclude columns are already part of the source-exports and are only included
1168        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1169        // be removed once we drop support for implicitly created subsources.
1170        exclude_columns: _,
1171        seen: _,
1172    } = options.clone().try_into()?;
1173    let details = details
1174        .as_ref()
1175        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177    let details =
1178        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179    let publication_details =
1180        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181    Ok(PostgresSourceConnection {
1182        connection: connection_item.id(),
1183        connection_id: connection_item.id(),
1184        publication: publication.expect("validated exists during purification"),
1185        publication_details,
1186    })
1187}
1188
1189fn plan_kafka_source_connection(
1190    scx: &StatementContext<'_>,
1191    connection_name: &ResolvedItemName,
1192    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193    include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197        sql_bail!(
1198            "{} is not a kafka connection",
1199            scx.catalog.resolve_full_name(connection_item.name())
1200        )
1201    }
1202    let KafkaSourceConfigOptionExtracted {
1203        group_id_prefix,
1204        topic,
1205        topic_metadata_refresh_interval,
1206        start_timestamp: _, // purified into `start_offset`
1207        start_offset,
1208        seen: _,
1209    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210    let topic = topic.expect("validated exists during purification");
1211    let mut start_offsets = BTreeMap::new();
1212    if let Some(offsets) = start_offset {
1213        for (part, offset) in offsets.iter().enumerate() {
1214            if *offset < 0 {
1215                sql_bail!("START OFFSET must be a nonnegative integer");
1216            }
1217            start_offsets.insert(i32::try_from(part)?, *offset);
1218        }
1219    }
1220    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221        // This is a librdkafka-enforced restriction that, if violated,
1222        // would result in a runtime error for the source.
1223        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224    }
1225    let metadata_columns = include_metadata
1226        .into_iter()
1227        .flat_map(|item| match item {
1228            SourceIncludeMetadata::Timestamp { alias } => {
1229                let name = match alias {
1230                    Some(name) => name.to_string(),
1231                    None => "timestamp".to_owned(),
1232                };
1233                Some((name, KafkaMetadataKind::Timestamp))
1234            }
1235            SourceIncludeMetadata::Partition { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "partition".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Partition))
1241            }
1242            SourceIncludeMetadata::Offset { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "offset".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Offset))
1248            }
1249            SourceIncludeMetadata::Headers { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "headers".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Headers))
1255            }
1256            SourceIncludeMetadata::Header {
1257                alias,
1258                key,
1259                use_bytes,
1260            } => Some((
1261                alias.to_string(),
1262                KafkaMetadataKind::Header {
1263                    key: key.clone(),
1264                    use_bytes: *use_bytes,
1265                },
1266            )),
1267            SourceIncludeMetadata::Key { .. } => {
1268                // handled below
1269                None
1270            }
1271        })
1272        .collect();
1273    Ok(KafkaSourceConnection {
1274        connection: connection_item.id(),
1275        connection_id: connection_item.id(),
1276        topic,
1277        start_offsets,
1278        group_id_prefix,
1279        topic_metadata_refresh_interval,
1280        metadata_columns,
1281    })
1282}
1283
1284fn apply_source_envelope_encoding(
1285    scx: &StatementContext,
1286    envelope: &ast::SourceEnvelope,
1287    format: &Option<FormatSpecifier<Aug>>,
1288    key_desc: Option<RelationDesc>,
1289    value_desc: RelationDesc,
1290    include_metadata: &[SourceIncludeMetadata],
1291    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292    source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294    (
1295        RelationDesc,
1296        SourceEnvelope,
1297        Option<SourceDataEncoding<ReferencedConnection>>,
1298    ),
1299    PlanError,
1300> {
1301    let encoding = match format {
1302        Some(format) => Some(get_encoding(scx, format, envelope)?),
1303        None => None,
1304    };
1305
1306    let (key_desc, value_desc) = match &encoding {
1307        Some(encoding) => {
1308            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1309            // single column of type bytes.
1310            match value_desc.typ().columns() {
1311                [typ] => match typ.scalar_type {
1312                    SqlScalarType::Bytes => {}
1313                    _ => sql_bail!(
1314                        "The schema produced by the source is incompatible with format decoding"
1315                    ),
1316                },
1317                _ => sql_bail!(
1318                    "The schema produced by the source is incompatible with format decoding"
1319                ),
1320            }
1321
1322            let (key_desc, value_desc) = encoding.desc()?;
1323
1324            // TODO(petrosagg): This piece of code seems to be making a statement about the
1325            // nullability of the NONE envelope when the source is Kafka. As written, the code
1326            // misses opportunities to mark columns as not nullable and is over conservative. For
1327            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1328            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1329            // should be replaced with precise type-level reasoning.
1330            let key_desc = key_desc.map(|desc| {
1331                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333                if is_kafka && is_envelope_none {
1334                    RelationDesc::from_names_and_types(
1335                        desc.into_iter()
1336                            .map(|(name, typ)| (name, typ.nullable(true))),
1337                    )
1338                } else {
1339                    desc
1340                }
1341            });
1342            (key_desc, value_desc)
1343        }
1344        None => (key_desc, value_desc),
1345    };
1346
1347    // KEY VALUE load generators are the only UPSERT source that
1348    // has no encoding but defaults to `INCLUDE KEY`.
1349    //
1350    // As discussed
1351    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1352    // removing this special case amounts to deciding how to handle null keys
1353    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1354    // this special case in.
1355    //
1356    // Note that this is safe because this generator is
1357    // 1. The only source with no encoding that can have its key included.
1358    // 2. Never produces null keys (or values, for that matter).
1359    let key_envelope_no_encoding = matches!(
1360        source_connection,
1361        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362            load_generator: LoadGenerator::KeyValue(_),
1363            ..
1364        })
1365    );
1366    let mut key_envelope = get_key_envelope(
1367        include_metadata,
1368        encoding.as_ref(),
1369        key_envelope_no_encoding,
1370    )?;
1371
1372    match (&envelope, &key_envelope) {
1373        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376        ),
1377        _ => {}
1378    };
1379
1380    // Not all source envelopes are compatible with all source connections.
1381    // Whoever constructs the source ingestion pipeline is responsible for
1382    // choosing compatible envelopes and connections.
1383    //
1384    // TODO(guswynn): ambiguously assert which connections and envelopes are
1385    // compatible in typechecking
1386    //
1387    // TODO: remove bails as more support for upsert is added.
1388    let envelope = match &envelope {
1389        // TODO: fixup key envelope
1390        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391        ast::SourceEnvelope::Debezium => {
1392            //TODO check that key envelope is not set
1393            let after_idx = match typecheck_debezium(&value_desc) {
1394                Ok((_before_idx, after_idx)) => Ok(after_idx),
1395                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396                    Some(DataEncoding::Avro(_)) => Err(type_err),
1397                    _ => Err(sql_err!(
1398                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399                    )),
1400                },
1401            }?;
1402
1403            UnplannedSourceEnvelope::Upsert {
1404                style: UpsertStyle::Debezium { after_idx },
1405            }
1406        }
1407        ast::SourceEnvelope::Upsert {
1408            value_decode_err_policy,
1409        } => {
1410            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411                None => {
1412                    if !key_envelope_no_encoding {
1413                        bail_unsupported!(format!(
1414                            "UPSERT requires a key/value format: {:?}",
1415                            format
1416                        ))
1417                    }
1418                    None
1419                }
1420                Some(key_encoding) => Some(key_encoding),
1421            };
1422            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1423            // specified.
1424            if key_envelope == KeyEnvelope::None {
1425                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426            }
1427            // If the value decode error policy is not set we use the default upsert style.
1428            let style = match value_decode_err_policy.as_slice() {
1429                [] => UpsertStyle::Default(key_envelope),
1430                [SourceErrorPolicy::Inline { alias }] => {
1431                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432                    UpsertStyle::ValueErrInline {
1433                        key_envelope,
1434                        error_column: alias
1435                            .as_ref()
1436                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437                    }
1438                }
1439                _ => {
1440                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441                }
1442            };
1443
1444            UnplannedSourceEnvelope::Upsert { style }
1445        }
1446        ast::SourceEnvelope::CdcV2 => {
1447            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448            //TODO check that key envelope is not set
1449            match format {
1450                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452            }
1453            UnplannedSourceEnvelope::CdcV2
1454        }
1455    };
1456
1457    let metadata_desc = included_column_desc(metadata_columns_desc);
1458    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460    Ok((desc, envelope, encoding))
1461}
1462
1463/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1464/// of columns and constraints.
1465fn plan_source_export_desc(
1466    scx: &StatementContext,
1467    name: &UnresolvedItemName,
1468    columns: &Vec<ColumnDef<Aug>>,
1469    constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471    let names: Vec<_> = columns
1472        .iter()
1473        .map(|c| normalize::column_name(c.name.clone()))
1474        .collect();
1475
1476    if let Some(dup) = names.iter().duplicates().next() {
1477        sql_bail!("column {} specified more than once", dup.quoted());
1478    }
1479
1480    // Build initial relation type that handles declared data types
1481    // and NOT NULL constraints.
1482    let mut column_types = Vec::with_capacity(columns.len());
1483    let mut keys = Vec::new();
1484
1485    for (i, c) in columns.into_iter().enumerate() {
1486        let aug_data_type = &c.data_type;
1487        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488        let mut nullable = true;
1489        for option in &c.options {
1490            match &option.option {
1491                ColumnOption::NotNull => nullable = false,
1492                ColumnOption::Default(_) => {
1493                    bail_unsupported!("Source export with default value")
1494                }
1495                ColumnOption::Unique { is_primary } => {
1496                    keys.push(vec![i]);
1497                    if *is_primary {
1498                        nullable = false;
1499                    }
1500                }
1501                other => {
1502                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1503                }
1504            }
1505        }
1506        column_types.push(ty.nullable(nullable));
1507    }
1508
1509    let mut seen_primary = false;
1510    'c: for constraint in constraints {
1511        match constraint {
1512            TableConstraint::Unique {
1513                name: _,
1514                columns,
1515                is_primary,
1516                nulls_not_distinct,
1517            } => {
1518                if seen_primary && *is_primary {
1519                    sql_bail!(
1520                        "multiple primary keys for source export {} are not allowed",
1521                        name.to_ast_string_stable()
1522                    );
1523                }
1524                seen_primary = *is_primary || seen_primary;
1525
1526                let mut key = vec![];
1527                for column in columns {
1528                    let column = normalize::column_name(column.clone());
1529                    match names.iter().position(|name| *name == column) {
1530                        None => sql_bail!("unknown column in constraint: {}", column),
1531                        Some(i) => {
1532                            let nullable = &mut column_types[i].nullable;
1533                            if *is_primary {
1534                                if *nulls_not_distinct {
1535                                    sql_bail!(
1536                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537                                    );
1538                                }
1539                                *nullable = false;
1540                            } else if !(*nulls_not_distinct || !*nullable) {
1541                                // Non-primary key unique constraints are only keys if all of their
1542                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1543                                break 'c;
1544                            }
1545
1546                            key.push(i);
1547                        }
1548                    }
1549                }
1550
1551                if *is_primary {
1552                    keys.insert(0, key);
1553                } else {
1554                    keys.push(key);
1555                }
1556            }
1557            TableConstraint::ForeignKey { .. } => {
1558                bail_unsupported!("Source export with a foreign key")
1559            }
1560            TableConstraint::Check { .. } => {
1561                bail_unsupported!("Source export with a check constraint")
1562            }
1563        }
1564    }
1565
1566    let typ = SqlRelationType::new(column_types).with_keys(keys);
1567    let desc = RelationDesc::new(typ, names);
1568    Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572    CreateSubsourceOption,
1573    (Progress, bool, Default(false)),
1574    (ExternalReference, UnresolvedItemName),
1575    (RetainHistory, OptionalDuration),
1576    (TextColumns, Vec::<Ident>, Default(vec![])),
1577    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578    (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582    scx: &StatementContext,
1583    stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585    let CreateSubsourceStatement {
1586        name,
1587        columns,
1588        of_source,
1589        constraints,
1590        if_not_exists,
1591        with_options,
1592    } = &stmt;
1593
1594    let CreateSubsourceOptionExtracted {
1595        progress,
1596        retain_history,
1597        external_reference,
1598        text_columns,
1599        exclude_columns,
1600        details,
1601        seen: _,
1602    } = with_options.clone().try_into()?;
1603
1604    // This invariant is enforced during purification; we are responsible for
1605    // creating the AST for subsources as a response to CREATE SOURCE
1606    // statements, so this would fire in integration testing if we failed to
1607    // uphold it.
1608    assert!(
1609        progress ^ (external_reference.is_some() && of_source.is_some()),
1610        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611    );
1612
1613    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615    let data_source = if let Some(source_reference) = of_source {
1616        // If the new source table syntax is forced we should not be creating any non-progress
1617        // subsources.
1618        if scx.catalog.system_vars().enable_create_table_from_source()
1619            && scx.catalog.system_vars().force_source_table_syntax()
1620        {
1621            Err(PlanError::UseTablesForSources(
1622                "CREATE SUBSOURCE".to_string(),
1623            ))?;
1624        }
1625
1626        // This is a subsource with the "natural" dependency order, i.e. it is
1627        // not a legacy subsource with the inverted structure.
1628        let ingestion_id = *source_reference.item_id();
1629        let external_reference = external_reference.unwrap();
1630
1631        // Decode the details option stored on the subsource statement, which contains information
1632        // created during the purification process.
1633        let details = details
1634            .as_ref()
1635            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637        let details =
1638            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639        let details =
1640            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641        let details = match details {
1642            SourceExportStatementDetails::Postgres { table } => {
1643                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644                    column_casts: crate::pure::postgres::generate_column_casts(
1645                        scx,
1646                        &table,
1647                        &text_columns,
1648                    )?,
1649                    table,
1650                })
1651            }
1652            SourceExportStatementDetails::MySql {
1653                table,
1654                initial_gtid_set,
1655            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656                table,
1657                initial_gtid_set,
1658                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659                exclude_columns: exclude_columns
1660                    .into_iter()
1661                    .map(|c| c.into_string())
1662                    .collect(),
1663            }),
1664            SourceExportStatementDetails::SqlServer {
1665                table,
1666                capture_instance,
1667                initial_lsn,
1668            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669                capture_instance,
1670                table,
1671                initial_lsn,
1672                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673                exclude_columns: exclude_columns
1674                    .into_iter()
1675                    .map(|c| c.into_string())
1676                    .collect(),
1677            }),
1678            SourceExportStatementDetails::LoadGenerator { output } => {
1679                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680            }
1681            SourceExportStatementDetails::Kafka {} => {
1682                bail_unsupported!("subsources cannot reference Kafka sources")
1683            }
1684        };
1685        DataSourceDesc::IngestionExport {
1686            ingestion_id,
1687            external_reference,
1688            details,
1689            // Subsources don't currently support non-default envelopes / encoding
1690            data_config: SourceExportDataConfig {
1691                envelope: SourceEnvelope::None(NoneEnvelope {
1692                    key_envelope: KeyEnvelope::None,
1693                    key_arity: 0,
1694                }),
1695                encoding: None,
1696            },
1697        }
1698    } else if progress {
1699        DataSourceDesc::Progress
1700    } else {
1701        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702    };
1703
1704    let if_not_exists = *if_not_exists;
1705    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710    let source = Source {
1711        create_sql,
1712        data_source,
1713        desc,
1714        compaction_window,
1715    };
1716
1717    Ok(Plan::CreateSource(CreateSourcePlan {
1718        name,
1719        source,
1720        if_not_exists,
1721        timeline: Timeline::EpochMilliseconds,
1722        in_cluster: None,
1723    }))
1724}
1725
1726generate_extracted_config!(
1727    TableFromSourceOption,
1728    (TextColumns, Vec::<Ident>, Default(vec![])),
1729    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730    (PartitionBy, Vec<Ident>),
1731    (RetainHistory, OptionalDuration),
1732    (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736    scx: &StatementContext,
1737    stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739    if !scx.catalog.system_vars().enable_create_table_from_source() {
1740        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741    }
1742
1743    let CreateTableFromSourceStatement {
1744        name,
1745        columns,
1746        constraints,
1747        if_not_exists,
1748        source,
1749        external_reference,
1750        envelope,
1751        format,
1752        include_metadata,
1753        with_options,
1754    } = &stmt;
1755
1756    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758    let TableFromSourceOptionExtracted {
1759        text_columns,
1760        exclude_columns,
1761        retain_history,
1762        partition_by,
1763        details,
1764        seen: _,
1765    } = with_options.clone().try_into()?;
1766
1767    let source_item = scx.get_item_by_resolved_name(source)?;
1768    let ingestion_id = source_item.id();
1769
1770    // Decode the details option stored on the statement, which contains information
1771    // created during the purification process.
1772    let details = details
1773        .as_ref()
1774        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776    let details =
1777        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778    let details =
1779        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782        && include_metadata
1783            .iter()
1784            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785    {
1786        // TODO(guswynn): should this be `bail_unsupported!`?
1787        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788    }
1789    if !matches!(
1790        details,
1791        SourceExportStatementDetails::Kafka { .. }
1792            | SourceExportStatementDetails::LoadGenerator { .. }
1793    ) && !include_metadata.is_empty()
1794    {
1795        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796    }
1797
1798    let details = match details {
1799        SourceExportStatementDetails::Postgres { table } => {
1800            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801                column_casts: crate::pure::postgres::generate_column_casts(
1802                    scx,
1803                    &table,
1804                    &text_columns,
1805                )?,
1806                table,
1807            })
1808        }
1809        SourceExportStatementDetails::MySql {
1810            table,
1811            initial_gtid_set,
1812        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813            table,
1814            initial_gtid_set,
1815            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816            exclude_columns: exclude_columns
1817                .into_iter()
1818                .map(|c| c.into_string())
1819                .collect(),
1820        }),
1821        SourceExportStatementDetails::SqlServer {
1822            table,
1823            capture_instance,
1824            initial_lsn,
1825        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826            table,
1827            capture_instance,
1828            initial_lsn,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834        }),
1835        SourceExportStatementDetails::LoadGenerator { output } => {
1836            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837        }
1838        SourceExportStatementDetails::Kafka {} => {
1839            if !include_metadata.is_empty()
1840                && !matches!(
1841                    envelope,
1842                    ast::SourceEnvelope::Upsert { .. }
1843                        | ast::SourceEnvelope::None
1844                        | ast::SourceEnvelope::Debezium
1845                )
1846            {
1847                // TODO(guswynn): should this be `bail_unsupported!`?
1848                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849            }
1850
1851            let metadata_columns = include_metadata
1852                .into_iter()
1853                .flat_map(|item| match item {
1854                    SourceIncludeMetadata::Timestamp { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "timestamp".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Timestamp))
1860                    }
1861                    SourceIncludeMetadata::Partition { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "partition".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Partition))
1867                    }
1868                    SourceIncludeMetadata::Offset { alias } => {
1869                        let name = match alias {
1870                            Some(name) => name.to_string(),
1871                            None => "offset".to_owned(),
1872                        };
1873                        Some((name, KafkaMetadataKind::Offset))
1874                    }
1875                    SourceIncludeMetadata::Headers { alias } => {
1876                        let name = match alias {
1877                            Some(name) => name.to_string(),
1878                            None => "headers".to_owned(),
1879                        };
1880                        Some((name, KafkaMetadataKind::Headers))
1881                    }
1882                    SourceIncludeMetadata::Header {
1883                        alias,
1884                        key,
1885                        use_bytes,
1886                    } => Some((
1887                        alias.to_string(),
1888                        KafkaMetadataKind::Header {
1889                            key: key.clone(),
1890                            use_bytes: *use_bytes,
1891                        },
1892                    )),
1893                    SourceIncludeMetadata::Key { .. } => {
1894                        // handled below
1895                        None
1896                    }
1897                })
1898                .collect();
1899
1900            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901        }
1902    };
1903
1904    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1907    // during purification and define the `columns` and `constraints` fields for the statement,
1908    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1909    // we use the source connection's default schema.
1910    let (key_desc, value_desc) =
1911        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912            let columns = match columns {
1913                TableFromSourceColumns::Defined(columns) => columns,
1914                _ => unreachable!(),
1915            };
1916            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917            (None, desc)
1918        } else {
1919            let key_desc = source_connection.default_key_desc();
1920            let value_desc = source_connection.default_value_desc();
1921            (Some(key_desc), value_desc)
1922        };
1923
1924    let metadata_columns_desc = match &details {
1925        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926            metadata_columns, ..
1927        }) => kafka_metadata_columns_desc(metadata_columns),
1928        _ => vec![],
1929    };
1930
1931    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932        scx,
1933        &envelope,
1934        format,
1935        key_desc,
1936        value_desc,
1937        include_metadata,
1938        metadata_columns_desc,
1939        source_connection,
1940    )?;
1941    if let TableFromSourceColumns::Named(col_names) = columns {
1942        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943    }
1944
1945    let names: Vec<_> = desc.iter_names().cloned().collect();
1946    if let Some(dup) = names.iter().duplicates().next() {
1947        sql_bail!("column {} specified more than once", dup.quoted());
1948    }
1949
1950    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952    // Allow users to specify a timeline. If they do not, determine a default
1953    // timeline for the source.
1954    let timeline = match envelope {
1955        SourceEnvelope::CdcV2 => {
1956            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957        }
1958        _ => Timeline::EpochMilliseconds,
1959    };
1960
1961    if let Some(partition_by) = partition_by {
1962        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963        check_partition_by(&desc, partition_by)?;
1964    }
1965
1966    let data_source = DataSourceDesc::IngestionExport {
1967        ingestion_id,
1968        external_reference: external_reference
1969            .as_ref()
1970            .expect("populated in purification")
1971            .clone(),
1972        details,
1973        data_config: SourceExportDataConfig { envelope, encoding },
1974    };
1975
1976    let if_not_exists = *if_not_exists;
1977
1978    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981    let table = Table {
1982        create_sql,
1983        desc: VersionedRelationDesc::new(desc),
1984        temporary: false,
1985        compaction_window,
1986        data_source: TableDataSource::DataSource {
1987            desc: data_source,
1988            timeline,
1989        },
1990    };
1991
1992    Ok(Plan::CreateTable(CreateTablePlan {
1993        name,
1994        table,
1995        if_not_exists,
1996    }))
1997}
1998
1999generate_extracted_config!(
2000    LoadGeneratorOption,
2001    (TickInterval, Duration),
2002    (AsOf, u64, Default(0_u64)),
2003    (UpTo, u64, Default(u64::MAX)),
2004    (ScaleFactor, f64),
2005    (MaxCardinality, u64),
2006    (Keys, u64),
2007    (SnapshotRounds, u64),
2008    (TransactionalSnapshot, bool),
2009    (ValueSize, u64),
2010    (Seed, u64),
2011    (Partitions, u64),
2012    (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016    pub(super) fn ensure_only_valid_options(
2017        &self,
2018        loadgen: &ast::LoadGenerator,
2019    ) -> Result<(), PlanError> {
2020        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022        let mut options = self.seen.clone();
2023
2024        let permitted_options: &[_] = match loadgen {
2025            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031            ast::LoadGenerator::KeyValue => &[
2032                TickInterval,
2033                Keys,
2034                SnapshotRounds,
2035                TransactionalSnapshot,
2036                ValueSize,
2037                Seed,
2038                Partitions,
2039                BatchSize,
2040            ],
2041        };
2042
2043        for o in permitted_options {
2044            options.remove(o);
2045        }
2046
2047        if !options.is_empty() {
2048            sql_bail!(
2049                "{} load generators do not support {} values",
2050                loadgen,
2051                options.iter().join(", ")
2052            )
2053        }
2054
2055        Ok(())
2056    }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060    scx: &StatementContext,
2061    loadgen: &ast::LoadGenerator,
2062    options: &[LoadGeneratorOption<Aug>],
2063    include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066    extracted.ensure_only_valid_options(loadgen)?;
2067
2068    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070    }
2071
2072    let load_generator = match loadgen {
2073        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074        ast::LoadGenerator::Clock => {
2075            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076            LoadGenerator::Clock
2077        }
2078        ast::LoadGenerator::Counter => {
2079            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080            let LoadGeneratorOptionExtracted {
2081                max_cardinality, ..
2082            } = extracted;
2083            LoadGenerator::Counter { max_cardinality }
2084        }
2085        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086        ast::LoadGenerator::Datums => {
2087            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088            LoadGenerator::Datums
2089        }
2090        ast::LoadGenerator::Tpch => {
2091            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093            // Default to 0.01 scale factor (=10MB).
2094            let sf: f64 = scale_factor.unwrap_or(0.01);
2095            if !sf.is_finite() || sf < 0.0 {
2096                sql_bail!("unsupported scale factor {sf}");
2097            }
2098
2099            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100                let total = (sf * multiplier).floor();
2101                let mut i = i64::try_cast_from(total)
2102                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103                if i < 1 {
2104                    i = 1;
2105                }
2106                Ok(i)
2107            };
2108
2109            // The multiplications here are safely unchecked because they will
2110            // overflow to infinity, which will be caught by f64_to_i64.
2111            let count_supplier = f_to_i(10_000f64)?;
2112            let count_part = f_to_i(200_000f64)?;
2113            let count_customer = f_to_i(150_000f64)?;
2114            let count_orders = f_to_i(150_000f64 * 10f64)?;
2115            let count_clerk = f_to_i(1_000f64)?;
2116
2117            LoadGenerator::Tpch {
2118                count_supplier,
2119                count_part,
2120                count_customer,
2121                count_orders,
2122                count_clerk,
2123            }
2124        }
2125        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127            let LoadGeneratorOptionExtracted {
2128                keys,
2129                snapshot_rounds,
2130                transactional_snapshot,
2131                value_size,
2132                tick_interval,
2133                seed,
2134                partitions,
2135                batch_size,
2136                ..
2137            } = extracted;
2138
2139            let mut include_offset = None;
2140            for im in include_metadata {
2141                match im {
2142                    SourceIncludeMetadata::Offset { alias } => {
2143                        include_offset = match alias {
2144                            Some(alias) => Some(alias.to_string()),
2145                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146                        }
2147                    }
2148                    SourceIncludeMetadata::Key { .. } => continue,
2149
2150                    _ => {
2151                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152                    }
2153                };
2154            }
2155
2156            let lgkv = KeyValueLoadGenerator {
2157                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158                snapshot_rounds: snapshot_rounds
2159                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160                // Defaults to true.
2161                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162                value_size: value_size
2163                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164                partitions: partitions
2165                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166                tick_interval,
2167                batch_size: batch_size
2168                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170                include_offset,
2171            };
2172
2173            if lgkv.keys == 0
2174                || lgkv.partitions == 0
2175                || lgkv.value_size == 0
2176                || lgkv.batch_size == 0
2177            {
2178                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179            }
2180
2181            if lgkv.keys % lgkv.partitions != 0 {
2182                sql_bail!("KEYS must be a multiple of PARTITIONS")
2183            }
2184
2185            if lgkv.batch_size > lgkv.keys {
2186                sql_bail!("KEYS must be larger than BATCH SIZE")
2187            }
2188
2189            // This constraints simplifies the source implementation.
2190            // We can lift it later.
2191            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193            }
2194
2195            if lgkv.snapshot_rounds == 0 {
2196                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197            }
2198
2199            LoadGenerator::KeyValue(lgkv)
2200        }
2201    };
2202
2203    Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207    let before = value_desc.get_by_name(&"before".into());
2208    let (after_idx, after_ty) = value_desc
2209        .get_by_name(&"after".into())
2210        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211    let before_idx = if let Some((before_idx, before_ty)) = before {
2212        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213            sql_bail!("'before' column must be of type record");
2214        }
2215        if before_ty != after_ty {
2216            sql_bail!("'before' type differs from 'after' column");
2217        }
2218        Some(before_idx)
2219    } else {
2220        None
2221    };
2222    Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226    scx: &StatementContext,
2227    format: &FormatSpecifier<Aug>,
2228    envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230    let encoding = match format {
2231        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232        FormatSpecifier::KeyValue { key, value } => {
2233            let key = {
2234                let encoding = get_encoding_inner(scx, key)?;
2235                Some(encoding.key.unwrap_or(encoding.value))
2236            };
2237            let value = get_encoding_inner(scx, value)?.value;
2238            SourceDataEncoding { key, value }
2239        }
2240    };
2241
2242    let requires_keyvalue = matches!(
2243        envelope,
2244        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245    );
2246    let is_keyvalue = encoding.key.is_some();
2247    if requires_keyvalue && !is_keyvalue {
2248        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249    };
2250
2251    Ok(encoding)
2252}
2253
2254/// Determine the cluster ID to use for this item.
2255///
2256/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2257/// Because of this, do not normalize/canonicalize the create SQL statement
2258/// until after calling this function.
2259fn source_sink_cluster_config<'a, 'ctx>(
2260    scx: &'a StatementContext<'ctx>,
2261    in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263    let cluster = match in_cluster {
2264        None => {
2265            let cluster = scx.catalog.resolve_cluster(None)?;
2266            *in_cluster = Some(ResolvedClusterName {
2267                id: cluster.id(),
2268                print_name: None,
2269            });
2270            cluster
2271        }
2272        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273    };
2274
2275    Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282    pub key_schema: Option<String>,
2283    pub value_schema: String,
2284    /// Reference schemas for the key schema, in dependency order.
2285    pub key_reference_schemas: Vec<String>,
2286    /// Reference schemas for the value schema, in dependency order.
2287    pub value_reference_schemas: Vec<String>,
2288    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2289    pub confluent_wire_format: bool,
2290}
2291
2292fn get_encoding_inner(
2293    scx: &StatementContext,
2294    format: &Format<Aug>,
2295) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2296    let value = match format {
2297        Format::Bytes => DataEncoding::Bytes,
2298        Format::Avro(schema) => {
2299            let Schema {
2300                key_schema,
2301                value_schema,
2302                key_reference_schemas,
2303                value_reference_schemas,
2304                csr_connection,
2305                confluent_wire_format,
2306            } = match schema {
2307                // TODO(jldlaughlin): we need a way to pass in primary key information
2308                // when building a source from a string or file.
2309                AvroSchema::InlineSchema {
2310                    schema: ast::Schema { schema },
2311                    with_options,
2312                } => {
2313                    let AvroSchemaOptionExtracted {
2314                        confluent_wire_format,
2315                        ..
2316                    } = with_options.clone().try_into()?;
2317
2318                    Schema {
2319                        key_schema: None,
2320                        value_schema: schema.clone(),
2321                        key_reference_schemas: vec![],
2322                        value_reference_schemas: vec![],
2323                        csr_connection: None,
2324                        confluent_wire_format,
2325                    }
2326                }
2327                AvroSchema::Csr {
2328                    csr_connection:
2329                        CsrConnectionAvro {
2330                            connection,
2331                            seed,
2332                            key_strategy: _,
2333                            value_strategy: _,
2334                        },
2335                } => {
2336                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2337                    let csr_connection = match item.connection()? {
2338                        Connection::Csr(_) => item.id(),
2339                        _ => {
2340                            sql_bail!(
2341                                "{} is not a schema registry connection",
2342                                scx.catalog
2343                                    .resolve_full_name(item.name())
2344                                    .to_string()
2345                                    .quoted()
2346                            )
2347                        }
2348                    };
2349
2350                    if let Some(seed) = seed {
2351                        Schema {
2352                            key_schema: seed.key_schema.clone(),
2353                            value_schema: seed.value_schema.clone(),
2354                            key_reference_schemas: seed.key_reference_schemas.clone(),
2355                            value_reference_schemas: seed.value_reference_schemas.clone(),
2356                            csr_connection: Some(csr_connection),
2357                            confluent_wire_format: true,
2358                        }
2359                    } else {
2360                        unreachable!("CSR seed resolution should already have been called: Avro")
2361                    }
2362                }
2363            };
2364
2365            if let Some(key_schema) = key_schema {
2366                return Ok(SourceDataEncoding {
2367                    key: Some(DataEncoding::Avro(AvroEncoding {
2368                        schema: key_schema,
2369                        reference_schemas: key_reference_schemas,
2370                        csr_connection: csr_connection.clone(),
2371                        confluent_wire_format,
2372                    })),
2373                    value: DataEncoding::Avro(AvroEncoding {
2374                        schema: value_schema,
2375                        reference_schemas: value_reference_schemas,
2376                        csr_connection,
2377                        confluent_wire_format,
2378                    }),
2379                });
2380            } else {
2381                DataEncoding::Avro(AvroEncoding {
2382                    schema: value_schema,
2383                    reference_schemas: value_reference_schemas,
2384                    csr_connection,
2385                    confluent_wire_format,
2386                })
2387            }
2388        }
2389        Format::Protobuf(schema) => match schema {
2390            ProtobufSchema::Csr {
2391                csr_connection:
2392                    CsrConnectionProtobuf {
2393                        connection:
2394                            CsrConnection {
2395                                connection,
2396                                options,
2397                            },
2398                        seed,
2399                    },
2400            } => {
2401                if let Some(CsrSeedProtobuf { key, value }) = seed {
2402                    let item = scx.get_item_by_resolved_name(connection)?;
2403                    let _ = match item.connection()? {
2404                        Connection::Csr(connection) => connection,
2405                        _ => {
2406                            sql_bail!(
2407                                "{} is not a schema registry connection",
2408                                scx.catalog
2409                                    .resolve_full_name(item.name())
2410                                    .to_string()
2411                                    .quoted()
2412                            )
2413                        }
2414                    };
2415
2416                    if !options.is_empty() {
2417                        sql_bail!("Protobuf CSR connections do not support any options");
2418                    }
2419
2420                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2421                        descriptors: strconv::parse_bytes(&value.schema)?,
2422                        message_name: value.message_name.clone(),
2423                        confluent_wire_format: true,
2424                    });
2425                    if let Some(key) = key {
2426                        return Ok(SourceDataEncoding {
2427                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2428                                descriptors: strconv::parse_bytes(&key.schema)?,
2429                                message_name: key.message_name.clone(),
2430                                confluent_wire_format: true,
2431                            })),
2432                            value,
2433                        });
2434                    }
2435                    value
2436                } else {
2437                    unreachable!("CSR seed resolution should already have been called: Proto")
2438                }
2439            }
2440            ProtobufSchema::InlineSchema {
2441                message_name,
2442                schema: ast::Schema { schema },
2443            } => {
2444                let descriptors = strconv::parse_bytes(schema)?;
2445
2446                DataEncoding::Protobuf(ProtobufEncoding {
2447                    descriptors,
2448                    message_name: message_name.to_owned(),
2449                    confluent_wire_format: false,
2450                })
2451            }
2452        },
2453        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2454            regex: mz_repr::adt::regex::Regex::new(regex, false)
2455                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2456        }),
2457        Format::Csv { columns, delimiter } => {
2458            let columns = match columns {
2459                CsvColumns::Header { names } => {
2460                    if names.is_empty() {
2461                        sql_bail!("[internal error] column spec should get names in purify")
2462                    }
2463                    ColumnSpec::Header {
2464                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2465                    }
2466                }
2467                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2468            };
2469            DataEncoding::Csv(CsvEncoding {
2470                columns,
2471                delimiter: u8::try_from(*delimiter)
2472                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2473            })
2474        }
2475        Format::Json { array: false } => DataEncoding::Json,
2476        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2477        Format::Text => DataEncoding::Text,
2478    };
2479    Ok(SourceDataEncoding { key: None, value })
2480}
2481
2482/// Extract the key envelope, if it is requested
2483fn get_key_envelope(
2484    included_items: &[SourceIncludeMetadata],
2485    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2486    key_envelope_no_encoding: bool,
2487) -> Result<KeyEnvelope, PlanError> {
2488    let key_definition = included_items
2489        .iter()
2490        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2491    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2492        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2493            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2494            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2495            (Some(name), _) if key_envelope_no_encoding => {
2496                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2497            }
2498            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2499            (_, None) => {
2500                // `kd.alias` == `None` means `INCLUDE KEY`
2501                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2502                // These both make sense with the same error message
2503                sql_bail!(
2504                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2505                        got bare FORMAT"
2506                );
2507            }
2508        }
2509    } else {
2510        Ok(KeyEnvelope::None)
2511    }
2512}
2513
2514/// Gets the key envelope for a given key encoding when no name for the key has
2515/// been requested by the user.
2516fn get_unnamed_key_envelope(
2517    key: Option<&DataEncoding<ReferencedConnection>>,
2518) -> Result<KeyEnvelope, PlanError> {
2519    // If the key is requested but comes from an unnamed type then it gets the name "key"
2520    //
2521    // Otherwise it gets the names of the columns in the type
2522    let is_composite = match key {
2523        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2524        Some(
2525            DataEncoding::Avro(_)
2526            | DataEncoding::Csv(_)
2527            | DataEncoding::Protobuf(_)
2528            | DataEncoding::Regex { .. },
2529        ) => true,
2530        None => false,
2531    };
2532
2533    if is_composite {
2534        Ok(KeyEnvelope::Flattened)
2535    } else {
2536        Ok(KeyEnvelope::Named("key".to_string()))
2537    }
2538}
2539
2540pub fn describe_create_view(
2541    _: &StatementContext,
2542    _: CreateViewStatement<Aug>,
2543) -> Result<StatementDesc, PlanError> {
2544    Ok(StatementDesc::new(None))
2545}
2546
2547pub fn plan_view(
2548    scx: &StatementContext,
2549    def: &mut ViewDefinition<Aug>,
2550    temporary: bool,
2551) -> Result<(QualifiedItemName, View), PlanError> {
2552    let create_sql = normalize::create_statement(
2553        scx,
2554        Statement::CreateView(CreateViewStatement {
2555            if_exists: IfExistsBehavior::Error,
2556            temporary,
2557            definition: def.clone(),
2558        }),
2559    )?;
2560
2561    let ViewDefinition {
2562        name,
2563        columns,
2564        query,
2565    } = def;
2566
2567    let query::PlannedRootQuery {
2568        expr,
2569        mut desc,
2570        finishing,
2571        scope: _,
2572    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2573    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2574    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2575    // here to help with database-issues#236. However, in the meantime, there might be a better
2576    // approach to solve database-issues#236:
2577    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2578    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2579        &finishing,
2580        expr.arity()
2581    ));
2582    if expr.contains_parameters()? {
2583        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2584    }
2585
2586    let dependencies = expr
2587        .depends_on()
2588        .into_iter()
2589        .map(|gid| scx.catalog.resolve_item_id(&gid))
2590        .collect();
2591
2592    let name = if temporary {
2593        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2594    } else {
2595        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2596    };
2597
2598    plan_utils::maybe_rename_columns(
2599        format!("view {}", scx.catalog.resolve_full_name(&name)),
2600        &mut desc,
2601        columns,
2602    )?;
2603    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2604
2605    if let Some(dup) = names.iter().duplicates().next() {
2606        sql_bail!("column {} specified more than once", dup.quoted());
2607    }
2608
2609    let view = View {
2610        create_sql,
2611        expr,
2612        dependencies,
2613        column_names: names,
2614        temporary,
2615    };
2616
2617    Ok((name, view))
2618}
2619
2620pub fn plan_create_view(
2621    scx: &StatementContext,
2622    mut stmt: CreateViewStatement<Aug>,
2623) -> Result<Plan, PlanError> {
2624    let CreateViewStatement {
2625        temporary,
2626        if_exists,
2627        definition,
2628    } = &mut stmt;
2629    let (name, view) = plan_view(scx, definition, *temporary)?;
2630
2631    // Override the statement-level IfExistsBehavior with Skip if this is
2632    // explicitly requested in the PlanContext (the default is `false`).
2633    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2634
2635    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2636        let if_exists = true;
2637        let cascade = false;
2638        let maybe_item_to_drop = plan_drop_item(
2639            scx,
2640            ObjectType::View,
2641            if_exists,
2642            definition.name.clone(),
2643            cascade,
2644        )?;
2645
2646        // Check if the new View depends on the item that we would be replacing.
2647        if let Some(id) = maybe_item_to_drop {
2648            let dependencies = view.expr.depends_on();
2649            let invalid_drop = scx
2650                .get_item(&id)
2651                .global_ids()
2652                .any(|gid| dependencies.contains(&gid));
2653            if invalid_drop {
2654                let item = scx.catalog.get_item(&id);
2655                sql_bail!(
2656                    "cannot replace view {0}: depended upon by new {0} definition",
2657                    scx.catalog.resolve_full_name(item.name())
2658                );
2659            }
2660
2661            Some(id)
2662        } else {
2663            None
2664        }
2665    } else {
2666        None
2667    };
2668    let drop_ids = replace
2669        .map(|id| {
2670            scx.catalog
2671                .item_dependents(id)
2672                .into_iter()
2673                .map(|id| id.unwrap_item_id())
2674                .collect()
2675        })
2676        .unwrap_or_default();
2677
2678    validate_view_dependencies(scx, &view.dependencies.0)?;
2679
2680    // Check for an object in the catalog with this same name
2681    let full_name = scx.catalog.resolve_full_name(&name);
2682    let partial_name = PartialItemName::from(full_name.clone());
2683    // For PostgreSQL compatibility, we need to prevent creating views when
2684    // there is an existing object *or* type of the same name.
2685    if let (Ok(item), IfExistsBehavior::Error, false) = (
2686        scx.catalog.resolve_item_or_type(&partial_name),
2687        *if_exists,
2688        ignore_if_exists_errors,
2689    ) {
2690        return Err(PlanError::ItemAlreadyExists {
2691            name: full_name.to_string(),
2692            item_type: item.item_type(),
2693        });
2694    }
2695
2696    Ok(Plan::CreateView(CreateViewPlan {
2697        name,
2698        view,
2699        replace,
2700        drop_ids,
2701        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2702        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2703    }))
2704}
2705
2706/// Validate the dependencies of a (materialized) view.
2707fn validate_view_dependencies(
2708    scx: &StatementContext,
2709    dependencies: &BTreeSet<CatalogItemId>,
2710) -> Result<(), PlanError> {
2711    for id in dependencies {
2712        let item = scx.catalog.get_item(id);
2713        if item.replacement_target().is_some() {
2714            let name = scx.catalog.minimal_qualification(item.name());
2715            return Err(PlanError::InvalidDependency {
2716                name: name.to_string(),
2717                item_type: format!("replacement {}", item.item_type()),
2718            });
2719        }
2720    }
2721
2722    Ok(())
2723}
2724
2725pub fn describe_create_materialized_view(
2726    _: &StatementContext,
2727    _: CreateMaterializedViewStatement<Aug>,
2728) -> Result<StatementDesc, PlanError> {
2729    Ok(StatementDesc::new(None))
2730}
2731
2732pub fn describe_create_continual_task(
2733    _: &StatementContext,
2734    _: CreateContinualTaskStatement<Aug>,
2735) -> Result<StatementDesc, PlanError> {
2736    Ok(StatementDesc::new(None))
2737}
2738
2739pub fn describe_create_network_policy(
2740    _: &StatementContext,
2741    _: CreateNetworkPolicyStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743    Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_alter_network_policy(
2747    _: &StatementContext,
2748    _: AlterNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750    Ok(StatementDesc::new(None))
2751}
2752
2753pub fn plan_create_materialized_view(
2754    scx: &StatementContext,
2755    mut stmt: CreateMaterializedViewStatement<Aug>,
2756) -> Result<Plan, PlanError> {
2757    let cluster_id =
2758        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2759    stmt.in_cluster = Some(ResolvedClusterName {
2760        id: cluster_id,
2761        print_name: None,
2762    });
2763
2764    let create_sql =
2765        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2766
2767    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2768    let name = scx.allocate_qualified_name(partial_name.clone())?;
2769
2770    let query::PlannedRootQuery {
2771        expr,
2772        mut desc,
2773        finishing,
2774        scope: _,
2775    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2776    // We get back a trivial finishing, see comment in `plan_view`.
2777    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2778        &finishing,
2779        expr.arity()
2780    ));
2781    if expr.contains_parameters()? {
2782        return Err(PlanError::ParameterNotAllowed(
2783            "materialized views".to_string(),
2784        ));
2785    }
2786
2787    plan_utils::maybe_rename_columns(
2788        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2789        &mut desc,
2790        &stmt.columns,
2791    )?;
2792    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2793
2794    let MaterializedViewOptionExtracted {
2795        assert_not_null,
2796        partition_by,
2797        retain_history,
2798        refresh,
2799        seen: _,
2800    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2801
2802    if let Some(partition_by) = partition_by {
2803        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2804        check_partition_by(&desc, partition_by)?;
2805    }
2806
2807    let refresh_schedule = {
2808        let mut refresh_schedule = RefreshSchedule::default();
2809        let mut on_commits_seen = 0;
2810        for refresh_option_value in refresh {
2811            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2812                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2813            }
2814            match refresh_option_value {
2815                RefreshOptionValue::OnCommit => {
2816                    on_commits_seen += 1;
2817                }
2818                RefreshOptionValue::AtCreation => {
2819                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2820                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2821                }
2822                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2823                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2824                    let ecx = &ExprContext {
2825                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2826                        name: "REFRESH AT",
2827                        scope: &Scope::empty(),
2828                        relation_type: &SqlRelationType::empty(),
2829                        allow_aggregates: false,
2830                        allow_subqueries: false,
2831                        allow_parameters: false,
2832                        allow_windows: false,
2833                    };
2834                    let hir = plan_expr(ecx, &time)?.cast_to(
2835                        ecx,
2836                        CastContext::Assignment,
2837                        &SqlScalarType::MzTimestamp,
2838                    )?;
2839                    // (mz_now was purified away to a literal earlier)
2840                    let timestamp = hir
2841                        .into_literal_mz_timestamp()
2842                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2843                    refresh_schedule.ats.push(timestamp);
2844                }
2845                RefreshOptionValue::Every(RefreshEveryOptionValue {
2846                    interval,
2847                    aligned_to,
2848                }) => {
2849                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2850                    if interval.as_microseconds() <= 0 {
2851                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2852                    }
2853                    if interval.months != 0 {
2854                        // This limitation is because we want Intervals to be cleanly convertable
2855                        // to a unix epoch timestamp difference. When the interval involves months, then
2856                        // this is not true anymore, because months have variable lengths.
2857                        // See `Timestamp::round_up`.
2858                        sql_bail!("REFRESH interval must not involve units larger than days");
2859                    }
2860                    let interval = interval.duration()?;
2861                    if u64::try_from(interval.as_millis()).is_err() {
2862                        sql_bail!("REFRESH interval too large");
2863                    }
2864
2865                    let mut aligned_to = match aligned_to {
2866                        Some(aligned_to) => aligned_to,
2867                        None => {
2868                            soft_panic_or_log!(
2869                                "ALIGNED TO should have been filled in by purification"
2870                            );
2871                            sql_bail!(
2872                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2873                            )
2874                        }
2875                    };
2876
2877                    // Desugar the `aligned_to` expression
2878                    transform_ast::transform(scx, &mut aligned_to)?;
2879
2880                    let ecx = &ExprContext {
2881                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2882                        name: "REFRESH EVERY ... ALIGNED TO",
2883                        scope: &Scope::empty(),
2884                        relation_type: &SqlRelationType::empty(),
2885                        allow_aggregates: false,
2886                        allow_subqueries: false,
2887                        allow_parameters: false,
2888                        allow_windows: false,
2889                    };
2890                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2891                        ecx,
2892                        CastContext::Assignment,
2893                        &SqlScalarType::MzTimestamp,
2894                    )?;
2895                    // (mz_now was purified away to a literal earlier)
2896                    let aligned_to_const = aligned_to_hir
2897                        .into_literal_mz_timestamp()
2898                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2899
2900                    refresh_schedule.everies.push(RefreshEvery {
2901                        interval,
2902                        aligned_to: aligned_to_const,
2903                    });
2904                }
2905            }
2906        }
2907
2908        if on_commits_seen > 1 {
2909            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2910        }
2911        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2912            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2913        }
2914
2915        if refresh_schedule == RefreshSchedule::default() {
2916            None
2917        } else {
2918            Some(refresh_schedule)
2919        }
2920    };
2921
2922    let as_of = stmt.as_of.map(Timestamp::from);
2923    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2924    let mut non_null_assertions = assert_not_null
2925        .into_iter()
2926        .map(normalize::column_name)
2927        .map(|assertion_name| {
2928            column_names
2929                .iter()
2930                .position(|col| col == &assertion_name)
2931                .ok_or_else(|| {
2932                    sql_err!(
2933                        "column {} in ASSERT NOT NULL option not found",
2934                        assertion_name.quoted()
2935                    )
2936                })
2937        })
2938        .collect::<Result<Vec<_>, _>>()?;
2939    non_null_assertions.sort();
2940    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2941        let dup = &column_names[*dup];
2942        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2943    }
2944
2945    if let Some(dup) = column_names.iter().duplicates().next() {
2946        sql_bail!("column {} specified more than once", dup.quoted());
2947    }
2948
2949    // Override the statement-level IfExistsBehavior with Skip if this is
2950    // explicitly requested in the PlanContext (the default is `false`).
2951    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2952        Ok(true) => IfExistsBehavior::Skip,
2953        _ => stmt.if_exists,
2954    };
2955
2956    let mut replace = None;
2957    let mut if_not_exists = false;
2958    match if_exists {
2959        IfExistsBehavior::Replace => {
2960            let if_exists = true;
2961            let cascade = false;
2962            let replace_id = plan_drop_item(
2963                scx,
2964                ObjectType::MaterializedView,
2965                if_exists,
2966                partial_name.clone().into(),
2967                cascade,
2968            )?;
2969
2970            // Check if the new Materialized View depends on the item that we would be replacing.
2971            if let Some(id) = replace_id {
2972                let dependencies = expr.depends_on();
2973                let invalid_drop = scx
2974                    .get_item(&id)
2975                    .global_ids()
2976                    .any(|gid| dependencies.contains(&gid));
2977                if invalid_drop {
2978                    let item = scx.catalog.get_item(&id);
2979                    sql_bail!(
2980                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2981                        scx.catalog.resolve_full_name(item.name())
2982                    );
2983                }
2984                replace = Some(id);
2985            }
2986        }
2987        IfExistsBehavior::Skip => if_not_exists = true,
2988        IfExistsBehavior::Error => (),
2989    }
2990    let drop_ids = replace
2991        .map(|id| {
2992            scx.catalog
2993                .item_dependents(id)
2994                .into_iter()
2995                .map(|id| id.unwrap_item_id())
2996                .collect()
2997        })
2998        .unwrap_or_default();
2999    let mut dependencies: BTreeSet<_> = expr
3000        .depends_on()
3001        .into_iter()
3002        .map(|gid| scx.catalog.resolve_item_id(&gid))
3003        .collect();
3004
3005    // Validate the replacement target, if one is given.
3006    let mut replacement_target = None;
3007    if let Some(target_name) = &stmt.replacement_for {
3008        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3009
3010        let target = scx.get_item_by_resolved_name(target_name)?;
3011        if target.item_type() != CatalogItemType::MaterializedView {
3012            return Err(PlanError::InvalidReplacement {
3013                item_type: target.item_type(),
3014                item_name: scx.catalog.minimal_qualification(target.name()),
3015                replacement_type: CatalogItemType::MaterializedView,
3016                replacement_name: partial_name,
3017            });
3018        }
3019        if target.id().is_system() {
3020            sql_bail!(
3021                "cannot replace {} because it is required by the database system",
3022                scx.catalog.minimal_qualification(target.name()),
3023            );
3024        }
3025
3026        // Check for dependency cycles.
3027        for dependent in scx.catalog.item_dependents(target.id()) {
3028            if let ObjectId::Item(id) = dependent
3029                && dependencies.contains(&id)
3030            {
3031                sql_bail!(
3032                    "replacement would cause {} to depend on itself",
3033                    scx.catalog.minimal_qualification(target.name()),
3034                );
3035            }
3036        }
3037
3038        dependencies.insert(target.id());
3039
3040        for use_id in target.used_by() {
3041            let use_item = scx.get_item(use_id);
3042            if use_item.replacement_target() == Some(target.id()) {
3043                sql_bail!(
3044                    "cannot replace {} because it already has a replacement: {}",
3045                    scx.catalog.minimal_qualification(target.name()),
3046                    scx.catalog.minimal_qualification(use_item.name()),
3047                );
3048            }
3049        }
3050
3051        replacement_target = Some(target.id());
3052    }
3053
3054    validate_view_dependencies(scx, &dependencies)?;
3055
3056    // Check for an object in the catalog with this same name
3057    let full_name = scx.catalog.resolve_full_name(&name);
3058    let partial_name = PartialItemName::from(full_name.clone());
3059    // For PostgreSQL compatibility, we need to prevent creating materialized
3060    // views when there is an existing object *or* type of the same name.
3061    if let (IfExistsBehavior::Error, Ok(item)) =
3062        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3063    {
3064        return Err(PlanError::ItemAlreadyExists {
3065            name: full_name.to_string(),
3066            item_type: item.item_type(),
3067        });
3068    }
3069
3070    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3071        name,
3072        materialized_view: MaterializedView {
3073            create_sql,
3074            expr,
3075            dependencies: DependencyIds(dependencies),
3076            column_names,
3077            replacement_target,
3078            cluster_id,
3079            non_null_assertions,
3080            compaction_window,
3081            refresh_schedule,
3082            as_of,
3083        },
3084        replace,
3085        drop_ids,
3086        if_not_exists,
3087        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3088    }))
3089}
3090
3091generate_extracted_config!(
3092    MaterializedViewOption,
3093    (AssertNotNull, Ident, AllowMultiple),
3094    (PartitionBy, Vec<Ident>),
3095    (RetainHistory, OptionalDuration),
3096    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3097);
3098
3099pub fn plan_create_continual_task(
3100    scx: &StatementContext,
3101    mut stmt: CreateContinualTaskStatement<Aug>,
3102) -> Result<Plan, PlanError> {
3103    match &stmt.sugar {
3104        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3105        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3106            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3107        }
3108        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3109            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3110        }
3111    };
3112    let cluster_id = match &stmt.in_cluster {
3113        None => scx.catalog.resolve_cluster(None)?.id(),
3114        Some(in_cluster) => in_cluster.id,
3115    };
3116    stmt.in_cluster = Some(ResolvedClusterName {
3117        id: cluster_id,
3118        print_name: None,
3119    });
3120
3121    let create_sql =
3122        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3123
3124    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3125
3126    // It seems desirable for a CT that e.g. simply filters the input to keep
3127    // the same nullability. So, start by assuming all columns are non-nullable,
3128    // and then make them nullable below if any of the exprs plan them as
3129    // nullable.
3130    let mut desc = match stmt.columns {
3131        None => None,
3132        Some(columns) => {
3133            let mut desc_columns = Vec::with_capacity(columns.capacity());
3134            for col in columns.iter() {
3135                desc_columns.push((
3136                    normalize::column_name(col.name.clone()),
3137                    SqlColumnType {
3138                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3139                        nullable: false,
3140                    },
3141                ));
3142            }
3143            Some(RelationDesc::from_names_and_types(desc_columns))
3144        }
3145    };
3146    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3147    match input.item_type() {
3148        // Input must be a thing directly backed by a persist shard, so we can
3149        // use a persist listen to efficiently rehydrate.
3150        CatalogItemType::ContinualTask
3151        | CatalogItemType::Table
3152        | CatalogItemType::MaterializedView
3153        | CatalogItemType::Source => {}
3154        CatalogItemType::Sink
3155        | CatalogItemType::View
3156        | CatalogItemType::Index
3157        | CatalogItemType::Type
3158        | CatalogItemType::Func
3159        | CatalogItemType::Secret
3160        | CatalogItemType::Connection => {
3161            sql_bail!(
3162                "CONTINUAL TASK cannot use {} as an input",
3163                input.item_type()
3164            );
3165        }
3166    }
3167
3168    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3169    let ct_name = stmt.name;
3170    let placeholder_id = match &ct_name {
3171        ResolvedItemName::ContinualTask { id, name } => {
3172            let desc = match desc.as_ref().cloned() {
3173                Some(x) => x,
3174                None => {
3175                    // The user didn't specify the CT's columns. Take a wild
3176                    // guess that the CT has the same shape as the input. It's
3177                    // fine if this is wrong, we'll get an error below after
3178                    // planning the query.
3179                    let desc = input.relation_desc().expect("item type checked above");
3180                    desc.into_owned()
3181                }
3182            };
3183            qcx.ctes.insert(
3184                *id,
3185                CteDesc {
3186                    name: name.item.clone(),
3187                    desc,
3188                },
3189            );
3190            Some(*id)
3191        }
3192        _ => None,
3193    };
3194
3195    let mut exprs = Vec::new();
3196    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3197        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3198        let query::PlannedRootQuery {
3199            expr,
3200            desc: desc_query,
3201            finishing,
3202            scope: _,
3203        } = query::plan_ct_query(&mut qcx, query)?;
3204        // We get back a trivial finishing because we plan with a "maintained"
3205        // QueryLifetime, see comment in `plan_view`.
3206        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3207            &finishing,
3208            expr.arity()
3209        ));
3210        if expr.contains_parameters()? {
3211            if expr.contains_parameters()? {
3212                return Err(PlanError::ParameterNotAllowed(
3213                    "continual tasks".to_string(),
3214                ));
3215            }
3216        }
3217        let expr = match desc.as_mut() {
3218            None => {
3219                desc = Some(desc_query);
3220                expr
3221            }
3222            Some(desc) => {
3223                // We specify the columns for DELETE, so if any columns types don't
3224                // match, it's because it's an INSERT.
3225                if desc_query.arity() > desc.arity() {
3226                    sql_bail!(
3227                        "statement {}: INSERT has more expressions than target columns",
3228                        idx
3229                    );
3230                }
3231                if desc_query.arity() < desc.arity() {
3232                    sql_bail!(
3233                        "statement {}: INSERT has more target columns than expressions",
3234                        idx
3235                    );
3236                }
3237                // Ensure the types of the source query match the types of the target table,
3238                // installing assignment casts where necessary and possible.
3239                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3240                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3241                let expr = expr.map_err(|e| {
3242                    sql_err!(
3243                        "statement {}: column {} is of type {} but expression is of type {}",
3244                        idx,
3245                        desc.get_name(e.column).quoted(),
3246                        qcx.humanize_scalar_type(&e.target_type, false),
3247                        qcx.humanize_scalar_type(&e.source_type, false),
3248                    )
3249                })?;
3250
3251                // Update ct nullability as necessary. The `ne` above verified that the
3252                // types are the same len.
3253                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3254                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3255                if updated {
3256                    let new_types = zip_types().map(|(ct, q)| {
3257                        let mut ct = ct.clone();
3258                        if q.nullable {
3259                            ct.nullable = true;
3260                        }
3261                        ct
3262                    });
3263                    *desc = RelationDesc::from_names_and_types(
3264                        desc.iter_names().cloned().zip_eq(new_types),
3265                    );
3266                }
3267
3268                expr
3269            }
3270        };
3271        match stmt {
3272            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3273            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3274        }
3275    }
3276    // TODO(ct3): Collect things by output and assert that there is only one (or
3277    // support multiple outputs).
3278    let expr = exprs
3279        .into_iter()
3280        .reduce(|acc, expr| acc.union(expr))
3281        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3282    let dependencies = expr
3283        .depends_on()
3284        .into_iter()
3285        .map(|gid| scx.catalog.resolve_item_id(&gid))
3286        .collect();
3287
3288    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3289    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3290    if let Some(dup) = column_names.iter().duplicates().next() {
3291        sql_bail!("column {} specified more than once", dup.quoted());
3292    }
3293
3294    // Check for an object in the catalog with this same name
3295    let name = match &ct_name {
3296        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3297        ResolvedItemName::ContinualTask { name, .. } => {
3298            let name = scx.allocate_qualified_name(name.clone())?;
3299            let full_name = scx.catalog.resolve_full_name(&name);
3300            let partial_name = PartialItemName::from(full_name.clone());
3301            // For PostgreSQL compatibility, we need to prevent creating this when there
3302            // is an existing object *or* type of the same name.
3303            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3304                return Err(PlanError::ItemAlreadyExists {
3305                    name: full_name.to_string(),
3306                    item_type: item.item_type(),
3307                });
3308            }
3309            name
3310        }
3311        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3312        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3313    };
3314
3315    let as_of = stmt.as_of.map(Timestamp::from);
3316    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3317        name,
3318        placeholder_id,
3319        desc,
3320        input_id: input.global_id(),
3321        with_snapshot: snapshot.unwrap_or(true),
3322        continual_task: MaterializedView {
3323            create_sql,
3324            expr,
3325            dependencies,
3326            column_names,
3327            replacement_target: None,
3328            cluster_id,
3329            non_null_assertions: Vec::new(),
3330            compaction_window: None,
3331            refresh_schedule: None,
3332            as_of,
3333        },
3334    }))
3335}
3336
3337fn continual_task_query<'a>(
3338    ct_name: &ResolvedItemName,
3339    stmt: &'a ast::ContinualTaskStmt<Aug>,
3340) -> Option<ast::Query<Aug>> {
3341    match stmt {
3342        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3343            table_name: _,
3344            columns,
3345            source,
3346            returning,
3347        }) => {
3348            if !columns.is_empty() || !returning.is_empty() {
3349                return None;
3350            }
3351            match source {
3352                ast::InsertSource::Query(query) => Some(query.clone()),
3353                ast::InsertSource::DefaultValues => None,
3354            }
3355        }
3356        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3357            table_name: _,
3358            alias,
3359            using,
3360            selection,
3361        }) => {
3362            if !using.is_empty() {
3363                return None;
3364            }
3365            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3366            let from = ast::TableWithJoins {
3367                relation: ast::TableFactor::Table {
3368                    name: ct_name.clone(),
3369                    alias: alias.clone(),
3370                },
3371                joins: Vec::new(),
3372            };
3373            let select = ast::Select {
3374                from: vec![from],
3375                selection: selection.clone(),
3376                distinct: None,
3377                projection: vec![ast::SelectItem::Wildcard],
3378                group_by: Vec::new(),
3379                having: None,
3380                qualify: None,
3381                options: Vec::new(),
3382            };
3383            let query = ast::Query {
3384                ctes: ast::CteBlock::Simple(Vec::new()),
3385                body: ast::SetExpr::Select(Box::new(select)),
3386                order_by: Vec::new(),
3387                limit: None,
3388                offset: None,
3389            };
3390            // Then negate it to turn it into retractions (after planning it).
3391            Some(query)
3392        }
3393    }
3394}
3395
3396generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3397
3398pub fn describe_create_sink(
3399    _: &StatementContext,
3400    _: CreateSinkStatement<Aug>,
3401) -> Result<StatementDesc, PlanError> {
3402    Ok(StatementDesc::new(None))
3403}
3404
3405generate_extracted_config!(
3406    CreateSinkOption,
3407    (Snapshot, bool),
3408    (PartitionStrategy, String),
3409    (Version, u64),
3410    (CommitInterval, Duration)
3411);
3412
3413pub fn plan_create_sink(
3414    scx: &StatementContext,
3415    stmt: CreateSinkStatement<Aug>,
3416) -> Result<Plan, PlanError> {
3417    // Check for an object in the catalog with this same name
3418    let Some(name) = stmt.name.clone() else {
3419        return Err(PlanError::MissingName(CatalogItemType::Sink));
3420    };
3421    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3422    let full_name = scx.catalog.resolve_full_name(&name);
3423    let partial_name = PartialItemName::from(full_name.clone());
3424    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3425        return Err(PlanError::ItemAlreadyExists {
3426            name: full_name.to_string(),
3427            item_type: item.item_type(),
3428        });
3429    }
3430
3431    plan_sink(scx, stmt)
3432}
3433
3434/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3435/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3436/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3437/// important.
3438fn plan_sink(
3439    scx: &StatementContext,
3440    mut stmt: CreateSinkStatement<Aug>,
3441) -> Result<Plan, PlanError> {
3442    let CreateSinkStatement {
3443        name,
3444        in_cluster: _,
3445        from,
3446        connection,
3447        format,
3448        envelope,
3449        mode,
3450        if_not_exists,
3451        with_options,
3452    } = stmt.clone();
3453
3454    let Some(name) = name else {
3455        return Err(PlanError::MissingName(CatalogItemType::Sink));
3456    };
3457    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3458
3459    let envelope = match (&connection, envelope, mode) {
3460        // Kafka sinks use ENVELOPE
3461        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3462            SinkEnvelope::Upsert
3463        }
3464        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3465            SinkEnvelope::Debezium
3466        }
3467        (CreateSinkConnection::Kafka { .. }, None, None) => {
3468            sql_bail!("ENVELOPE clause is required")
3469        }
3470        (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3471            sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3472        }
3473        // Iceberg sinks use MODE
3474        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3475            SinkEnvelope::Upsert
3476        }
3477        (CreateSinkConnection::Iceberg { .. }, None, None) => {
3478            sql_bail!("MODE clause is required")
3479        }
3480        (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3481            sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3482        }
3483    };
3484
3485    let from_name = &from;
3486    let from = scx.get_item_by_resolved_name(&from)?;
3487
3488    {
3489        use CatalogItemType::*;
3490        match from.item_type() {
3491            Table | Source | MaterializedView | ContinualTask => {
3492                if from.replacement_target().is_some() {
3493                    let name = scx.catalog.minimal_qualification(from.name());
3494                    return Err(PlanError::InvalidSinkFrom {
3495                        name: name.to_string(),
3496                        item_type: format!("replacement {}", from.item_type()),
3497                    });
3498                }
3499            }
3500            Sink | View | Index | Type | Func | Secret | Connection => {
3501                let name = scx.catalog.minimal_qualification(from.name());
3502                return Err(PlanError::InvalidSinkFrom {
3503                    name: name.to_string(),
3504                    item_type: from.item_type().to_string(),
3505                });
3506            }
3507        }
3508    }
3509
3510    if from.id().is_system() {
3511        bail_unsupported!("creating a sink directly on a catalog object");
3512    }
3513
3514    let desc = from.relation_desc().expect("item type checked above");
3515    let key_indices = match &connection {
3516        CreateSinkConnection::Kafka { key: Some(key), .. }
3517        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3518            let key_columns = key
3519                .key_columns
3520                .clone()
3521                .into_iter()
3522                .map(normalize::column_name)
3523                .collect::<Vec<_>>();
3524            let mut uniq = BTreeSet::new();
3525            for col in key_columns.iter() {
3526                if !uniq.insert(col) {
3527                    sql_bail!("duplicate column referenced in KEY: {}", col);
3528                }
3529            }
3530            let indices = key_columns
3531                .iter()
3532                .map(|col| -> anyhow::Result<usize> {
3533                    let name_idx =
3534                        desc.get_by_name(col)
3535                            .map(|(idx, _type)| idx)
3536                            .ok_or_else(|| {
3537                                sql_err!("column referenced in KEY does not exist: {}", col)
3538                            })?;
3539                    if desc.get_unambiguous_name(name_idx).is_none() {
3540                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3541                    }
3542                    Ok(name_idx)
3543                })
3544                .collect::<Result<Vec<_>, _>>()?;
3545            let is_valid_key = desc
3546                .typ()
3547                .keys
3548                .iter()
3549                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3550
3551            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3552                if key.not_enforced {
3553                    scx.catalog
3554                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3555                            key: key_columns.clone(),
3556                            name: name.item.clone(),
3557                        })
3558                } else {
3559                    return Err(PlanError::UpsertSinkWithInvalidKey {
3560                        name: from_name.full_name_str(),
3561                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3562                        valid_keys: desc
3563                            .typ()
3564                            .keys
3565                            .iter()
3566                            .map(|key| {
3567                                key.iter()
3568                                    .map(|col| desc.get_name(*col).as_str().into())
3569                                    .collect()
3570                            })
3571                            .collect(),
3572                    });
3573                }
3574            }
3575            Some(indices)
3576        }
3577        CreateSinkConnection::Kafka { key: None, .. }
3578        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3579    };
3580
3581    let headers_index = match &connection {
3582        CreateSinkConnection::Kafka {
3583            headers: Some(headers),
3584            ..
3585        } => {
3586            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3587
3588            match envelope {
3589                SinkEnvelope::Upsert => (),
3590                SinkEnvelope::Debezium => {
3591                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3592                }
3593            };
3594
3595            let headers = normalize::column_name(headers.clone());
3596            let (idx, ty) = desc
3597                .get_by_name(&headers)
3598                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3599
3600            if desc.get_unambiguous_name(idx).is_none() {
3601                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3602            }
3603
3604            match &ty.scalar_type {
3605                SqlScalarType::Map { value_type, .. }
3606                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3607                _ => sql_bail!(
3608                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3609                ),
3610            }
3611
3612            Some(idx)
3613        }
3614        _ => None,
3615    };
3616
3617    // pick the first valid natural relation key, if any
3618    let relation_key_indices = desc.typ().keys.get(0).cloned();
3619
3620    let key_desc_and_indices = key_indices.map(|key_indices| {
3621        let cols = desc
3622            .iter()
3623            .map(|(name, ty)| (name.clone(), ty.clone()))
3624            .collect::<Vec<_>>();
3625        let (names, types): (Vec<_>, Vec<_>) =
3626            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3627        let typ = SqlRelationType::new(types);
3628        (RelationDesc::new(typ, names), key_indices)
3629    });
3630
3631    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3632        return Err(PlanError::UpsertSinkWithoutKey);
3633    }
3634
3635    let CreateSinkOptionExtracted {
3636        snapshot,
3637        version,
3638        partition_strategy: _,
3639        seen: _,
3640        commit_interval,
3641    } = with_options.try_into()?;
3642
3643    let connection_builder = match connection {
3644        CreateSinkConnection::Kafka {
3645            connection,
3646            options,
3647            ..
3648        } => kafka_sink_builder(
3649            scx,
3650            connection,
3651            options,
3652            format,
3653            relation_key_indices,
3654            key_desc_and_indices,
3655            headers_index,
3656            desc.into_owned(),
3657            envelope,
3658            from.id(),
3659            commit_interval,
3660        )?,
3661        CreateSinkConnection::Iceberg {
3662            connection,
3663            aws_connection,
3664            options,
3665            ..
3666        } => iceberg_sink_builder(
3667            scx,
3668            connection,
3669            aws_connection,
3670            options,
3671            relation_key_indices,
3672            key_desc_and_indices,
3673            commit_interval,
3674        )?,
3675    };
3676
3677    // WITH SNAPSHOT defaults to true
3678    let with_snapshot = snapshot.unwrap_or(true);
3679    // VERSION defaults to 0
3680    let version = version.unwrap_or(0);
3681
3682    // We will rewrite the cluster if one is not provided, so we must use the
3683    // `in_cluster` value we plan to normalize when we canonicalize the create
3684    // statement.
3685    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3686    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3687
3688    Ok(Plan::CreateSink(CreateSinkPlan {
3689        name,
3690        sink: Sink {
3691            create_sql,
3692            from: from.global_id(),
3693            connection: connection_builder,
3694            envelope,
3695            version,
3696            commit_interval,
3697        },
3698        with_snapshot,
3699        if_not_exists,
3700        in_cluster: in_cluster.id(),
3701    }))
3702}
3703
3704fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3705    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3706
3707    let existing_keys = desc
3708        .typ()
3709        .keys
3710        .iter()
3711        .map(|key_columns| {
3712            key_columns
3713                .iter()
3714                .map(|col| desc.get_name(*col).as_str())
3715                .join(", ")
3716        })
3717        .join(", ");
3718
3719    sql_err!(
3720        "Key constraint ({}) conflicts with existing key ({})",
3721        user_keys,
3722        existing_keys
3723    )
3724}
3725
3726/// Creating this by hand instead of using generate_extracted_config! macro
3727/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3728#[derive(Debug, Default, PartialEq, Clone)]
3729pub struct CsrConfigOptionExtracted {
3730    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3731    pub(crate) avro_key_fullname: Option<String>,
3732    pub(crate) avro_value_fullname: Option<String>,
3733    pub(crate) null_defaults: bool,
3734    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3735    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3736    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3737    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3738}
3739
3740impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3741    type Error = crate::plan::PlanError;
3742    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3743        let mut extracted = CsrConfigOptionExtracted::default();
3744        let mut common_doc_comments = BTreeMap::new();
3745        for option in v {
3746            if !extracted.seen.insert(option.name.clone()) {
3747                return Err(PlanError::Unstructured({
3748                    format!("{} specified more than once", option.name)
3749                }));
3750            }
3751            let option_name = option.name.clone();
3752            let option_name_str = option_name.to_ast_string_simple();
3753            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3754                option_name: option_name.to_ast_string_simple(),
3755                err: e.into(),
3756            };
3757            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3758                val.map(|s| match s {
3759                    WithOptionValue::Value(Value::String(s)) => {
3760                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3761                    }
3762                    _ => Err("must be a string".to_string()),
3763                })
3764                .transpose()
3765                .map_err(PlanError::Unstructured)
3766                .map_err(better_error)
3767            };
3768            match option.name {
3769                CsrConfigOptionName::AvroKeyFullname => {
3770                    extracted.avro_key_fullname =
3771                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3772                }
3773                CsrConfigOptionName::AvroValueFullname => {
3774                    extracted.avro_value_fullname =
3775                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3776                }
3777                CsrConfigOptionName::NullDefaults => {
3778                    extracted.null_defaults =
3779                        <bool>::try_from_value(option.value).map_err(better_error)?;
3780                }
3781                CsrConfigOptionName::AvroDocOn(doc_on) => {
3782                    let value = String::try_from_value(option.value.ok_or_else(|| {
3783                        PlanError::InvalidOptionValue {
3784                            option_name: option_name_str,
3785                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3786                        }
3787                    })?)
3788                    .map_err(better_error)?;
3789                    let key = match doc_on.identifier {
3790                        DocOnIdentifier::Column(ast::ColumnName {
3791                            relation: ResolvedItemName::Item { id, .. },
3792                            column: ResolvedColumnReference::Column { name, index: _ },
3793                        }) => DocTarget::Field {
3794                            object_id: id,
3795                            column_name: name,
3796                        },
3797                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3798                            DocTarget::Type(id)
3799                        }
3800                        _ => unreachable!(),
3801                    };
3802
3803                    match doc_on.for_schema {
3804                        DocOnSchema::KeyOnly => {
3805                            extracted.key_doc_options.insert(key, value);
3806                        }
3807                        DocOnSchema::ValueOnly => {
3808                            extracted.value_doc_options.insert(key, value);
3809                        }
3810                        DocOnSchema::All => {
3811                            common_doc_comments.insert(key, value);
3812                        }
3813                    }
3814                }
3815                CsrConfigOptionName::KeyCompatibilityLevel => {
3816                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3817                }
3818                CsrConfigOptionName::ValueCompatibilityLevel => {
3819                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3820                }
3821            }
3822        }
3823
3824        for (key, value) in common_doc_comments {
3825            if !extracted.key_doc_options.contains_key(&key) {
3826                extracted.key_doc_options.insert(key.clone(), value.clone());
3827            }
3828            if !extracted.value_doc_options.contains_key(&key) {
3829                extracted.value_doc_options.insert(key, value);
3830            }
3831        }
3832        Ok(extracted)
3833    }
3834}
3835
3836fn iceberg_sink_builder(
3837    scx: &StatementContext,
3838    catalog_connection: ResolvedItemName,
3839    aws_connection: ResolvedItemName,
3840    options: Vec<IcebergSinkConfigOption<Aug>>,
3841    relation_key_indices: Option<Vec<usize>>,
3842    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3843    commit_interval: Option<Duration>,
3844) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3845    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3846    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3847    let catalog_connection_id = catalog_connection_item.id();
3848    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3849    let aws_connection_id = aws_connection_item.id();
3850    if !matches!(
3851        catalog_connection_item.connection()?,
3852        Connection::IcebergCatalog(_)
3853    ) {
3854        sql_bail!(
3855            "{} is not an iceberg catalog connection",
3856            scx.catalog
3857                .resolve_full_name(catalog_connection_item.name())
3858                .to_string()
3859                .quoted()
3860        );
3861    };
3862
3863    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3864        sql_bail!(
3865            "{} is not an AWS connection",
3866            scx.catalog
3867                .resolve_full_name(aws_connection_item.name())
3868                .to_string()
3869                .quoted()
3870        );
3871    }
3872
3873    let IcebergSinkConfigOptionExtracted {
3874        table,
3875        namespace,
3876        seen: _,
3877    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3878
3879    let Some(table) = table else {
3880        sql_bail!("Iceberg sink must specify TABLE");
3881    };
3882    let Some(namespace) = namespace else {
3883        sql_bail!("Iceberg sink must specify NAMESPACE");
3884    };
3885    if commit_interval.is_none() {
3886        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3887    }
3888
3889    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3890        catalog_connection_id,
3891        catalog_connection: catalog_connection_id,
3892        aws_connection_id,
3893        aws_connection: aws_connection_id,
3894        table,
3895        namespace,
3896        relation_key_indices,
3897        key_desc_and_indices,
3898    }))
3899}
3900
3901fn kafka_sink_builder(
3902    scx: &StatementContext,
3903    connection: ResolvedItemName,
3904    options: Vec<KafkaSinkConfigOption<Aug>>,
3905    format: Option<FormatSpecifier<Aug>>,
3906    relation_key_indices: Option<Vec<usize>>,
3907    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3908    headers_index: Option<usize>,
3909    value_desc: RelationDesc,
3910    envelope: SinkEnvelope,
3911    sink_from: CatalogItemId,
3912    commit_interval: Option<Duration>,
3913) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3914    // Get Kafka connection.
3915    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3916    let connection_id = connection_item.id();
3917    match connection_item.connection()? {
3918        Connection::Kafka(_) => (),
3919        _ => sql_bail!(
3920            "{} is not a kafka connection",
3921            scx.catalog.resolve_full_name(connection_item.name())
3922        ),
3923    };
3924
3925    if commit_interval.is_some() {
3926        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3927    }
3928
3929    let KafkaSinkConfigOptionExtracted {
3930        topic,
3931        compression_type,
3932        partition_by,
3933        progress_group_id_prefix,
3934        transactional_id_prefix,
3935        legacy_ids,
3936        topic_config,
3937        topic_metadata_refresh_interval,
3938        topic_partition_count,
3939        topic_replication_factor,
3940        seen: _,
3941    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3942
3943    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3944        (Some(_), Some(true)) => {
3945            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3946        }
3947        (None, Some(true)) => KafkaIdStyle::Legacy,
3948        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3949    };
3950
3951    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3952        (Some(_), Some(true)) => {
3953            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3954        }
3955        (None, Some(true)) => KafkaIdStyle::Legacy,
3956        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3957    };
3958
3959    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3960
3961    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3962        // This is a librdkafka-enforced restriction that, if violated,
3963        // would result in a runtime error for the source.
3964        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3965    }
3966
3967    let assert_positive = |val: Option<i32>, name: &str| {
3968        if let Some(val) = val {
3969            if val <= 0 {
3970                sql_bail!("{} must be a positive integer", name);
3971            }
3972        }
3973        val.map(NonNeg::try_from)
3974            .transpose()
3975            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3976    };
3977    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3978    let topic_replication_factor =
3979        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3980
3981    // Helper method to parse avro connection options for format specifiers that use avro
3982    // for either key or value encoding.
3983    let gen_avro_schema_options = |conn| {
3984        let CsrConnectionAvro {
3985            connection:
3986                CsrConnection {
3987                    connection,
3988                    options,
3989                },
3990            seed,
3991            key_strategy,
3992            value_strategy,
3993        } = conn;
3994        if seed.is_some() {
3995            sql_bail!("SEED option does not make sense with sinks");
3996        }
3997        if key_strategy.is_some() {
3998            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3999        }
4000        if value_strategy.is_some() {
4001            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
4002        }
4003
4004        let item = scx.get_item_by_resolved_name(&connection)?;
4005        let csr_connection = match item.connection()? {
4006            Connection::Csr(_) => item.id(),
4007            _ => {
4008                sql_bail!(
4009                    "{} is not a schema registry connection",
4010                    scx.catalog
4011                        .resolve_full_name(item.name())
4012                        .to_string()
4013                        .quoted()
4014                )
4015            }
4016        };
4017        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4018
4019        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4020            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4021        }
4022
4023        if key_desc_and_indices.is_some()
4024            && (extracted_options.avro_key_fullname.is_some()
4025                ^ extracted_options.avro_value_fullname.is_some())
4026        {
4027            sql_bail!(
4028                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4029            );
4030        }
4031
4032        Ok((csr_connection, extracted_options))
4033    };
4034
4035    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4036        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4037        Format::Bytes if desc.arity() == 1 => {
4038            let col_type = &desc.typ().column_types[0].scalar_type;
4039            if !mz_pgrepr::Value::can_encode_binary(col_type) {
4040                bail_unsupported!(format!(
4041                    "BYTES format with non-encodable type: {:?}",
4042                    col_type
4043                ));
4044            }
4045
4046            Ok(KafkaSinkFormatType::Bytes)
4047        }
4048        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4049        Format::Bytes | Format::Text => {
4050            bail_unsupported!("BYTES or TEXT format with multiple columns")
4051        }
4052        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4053        Format::Avro(AvroSchema::Csr { csr_connection }) => {
4054            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4055            let schema = if is_key {
4056                AvroSchemaGenerator::new(
4057                    desc.clone(),
4058                    false,
4059                    options.key_doc_options,
4060                    options.avro_key_fullname.as_deref().unwrap_or("row"),
4061                    options.null_defaults,
4062                    Some(sink_from),
4063                    false,
4064                )?
4065                .schema()
4066                .to_string()
4067            } else {
4068                AvroSchemaGenerator::new(
4069                    desc.clone(),
4070                    matches!(envelope, SinkEnvelope::Debezium),
4071                    options.value_doc_options,
4072                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4073                    options.null_defaults,
4074                    Some(sink_from),
4075                    true,
4076                )?
4077                .schema()
4078                .to_string()
4079            };
4080            Ok(KafkaSinkFormatType::Avro {
4081                schema,
4082                compatibility_level: if is_key {
4083                    options.key_compatibility_level
4084                } else {
4085                    options.value_compatibility_level
4086                },
4087                csr_connection,
4088            })
4089        }
4090        format => bail_unsupported!(format!("sink format {:?}", format)),
4091    };
4092
4093    let partition_by = match &partition_by {
4094        Some(partition_by) => {
4095            let mut scope = Scope::from_source(None, value_desc.iter_names());
4096
4097            match envelope {
4098                SinkEnvelope::Upsert => (),
4099                SinkEnvelope::Debezium => {
4100                    let key_indices: HashSet<_> = key_desc_and_indices
4101                        .as_ref()
4102                        .map(|(_desc, indices)| indices.as_slice())
4103                        .unwrap_or_default()
4104                        .into_iter()
4105                        .collect();
4106                    for (i, item) in scope.items.iter_mut().enumerate() {
4107                        if !key_indices.contains(&i) {
4108                            item.error_if_referenced = Some(|_table, column| {
4109                                PlanError::InvalidPartitionByEnvelopeDebezium {
4110                                    column_name: column.to_string(),
4111                                }
4112                            });
4113                        }
4114                    }
4115                }
4116            };
4117
4118            let ecx = &ExprContext {
4119                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4120                name: "PARTITION BY",
4121                scope: &scope,
4122                relation_type: value_desc.typ(),
4123                allow_aggregates: false,
4124                allow_subqueries: false,
4125                allow_parameters: false,
4126                allow_windows: false,
4127            };
4128            let expr = plan_expr(ecx, partition_by)?.cast_to(
4129                ecx,
4130                CastContext::Assignment,
4131                &SqlScalarType::UInt64,
4132            )?;
4133            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4134
4135            Some(expr)
4136        }
4137        _ => None,
4138    };
4139
4140    // Map from the format specifier of the statement to the individual key/value formats for the sink.
4141    let format = match format {
4142        Some(FormatSpecifier::KeyValue { key, value }) => {
4143            let key_format = match key_desc_and_indices.as_ref() {
4144                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4145                None => None,
4146            };
4147            KafkaSinkFormat {
4148                value_format: map_format(value, &value_desc, false)?,
4149                key_format,
4150            }
4151        }
4152        Some(FormatSpecifier::Bare(format)) => {
4153            let key_format = match key_desc_and_indices.as_ref() {
4154                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4155                None => None,
4156            };
4157            KafkaSinkFormat {
4158                value_format: map_format(format, &value_desc, false)?,
4159                key_format,
4160            }
4161        }
4162        None => bail_unsupported!("sink without format"),
4163    };
4164
4165    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4166        connection_id,
4167        connection: connection_id,
4168        format,
4169        topic: topic_name,
4170        relation_key_indices,
4171        key_desc_and_indices,
4172        headers_index,
4173        value_desc,
4174        partition_by,
4175        compression_type,
4176        progress_group_id,
4177        transactional_id,
4178        topic_options: KafkaTopicOptions {
4179            partition_count: topic_partition_count,
4180            replication_factor: topic_replication_factor,
4181            topic_config: topic_config.unwrap_or_default(),
4182        },
4183        topic_metadata_refresh_interval,
4184    }))
4185}
4186
4187pub fn describe_create_index(
4188    _: &StatementContext,
4189    _: CreateIndexStatement<Aug>,
4190) -> Result<StatementDesc, PlanError> {
4191    Ok(StatementDesc::new(None))
4192}
4193
4194pub fn plan_create_index(
4195    scx: &StatementContext,
4196    mut stmt: CreateIndexStatement<Aug>,
4197) -> Result<Plan, PlanError> {
4198    let CreateIndexStatement {
4199        name,
4200        on_name,
4201        in_cluster,
4202        key_parts,
4203        with_options,
4204        if_not_exists,
4205    } = &mut stmt;
4206    let on = scx.get_item_by_resolved_name(on_name)?;
4207
4208    {
4209        use CatalogItemType::*;
4210        match on.item_type() {
4211            Table | Source | View | MaterializedView | ContinualTask => {
4212                if on.replacement_target().is_some() {
4213                    sql_bail!(
4214                        "index cannot be created on {} because it is a replacement {}",
4215                        on_name.full_name_str(),
4216                        on.item_type(),
4217                    );
4218                }
4219            }
4220            Sink | Index | Type | Func | Secret | Connection => {
4221                sql_bail!(
4222                    "index cannot be created on {} because it is a {}",
4223                    on_name.full_name_str(),
4224                    on.item_type(),
4225                );
4226            }
4227        }
4228    }
4229
4230    let on_desc = on.relation_desc().expect("item type checked above");
4231
4232    let filled_key_parts = match key_parts {
4233        Some(kp) => kp.to_vec(),
4234        None => {
4235            // `key_parts` is None if we're creating a "default" index.
4236            let key = on_desc.typ().default_key();
4237            key.iter()
4238                .map(|i| match on_desc.get_unambiguous_name(*i) {
4239                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4240                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4241                })
4242                .collect()
4243        }
4244    };
4245    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4246
4247    let index_name = if let Some(name) = name {
4248        QualifiedItemName {
4249            qualifiers: on.name().qualifiers.clone(),
4250            item: normalize::ident(name.clone()),
4251        }
4252    } else {
4253        let mut idx_name = QualifiedItemName {
4254            qualifiers: on.name().qualifiers.clone(),
4255            item: on.name().item.clone(),
4256        };
4257        if key_parts.is_none() {
4258            // We're trying to create the "default" index.
4259            idx_name.item += "_primary_idx";
4260        } else {
4261            // Use PG schema for automatically naming indexes:
4262            // `<table>_<_-separated indexed expressions>_idx`
4263            let index_name_col_suffix = keys
4264                .iter()
4265                .map(|k| match k {
4266                    mz_expr::MirScalarExpr::Column(i, name) => {
4267                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4268                            (Some(col_name), _) => col_name.to_string(),
4269                            (None, Some(name)) => name.to_string(),
4270                            (None, None) => format!("{}", i + 1),
4271                        }
4272                    }
4273                    _ => "expr".to_string(),
4274                })
4275                .join("_");
4276            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4277                .expect("write on strings cannot fail");
4278            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4279        }
4280
4281        if !*if_not_exists {
4282            scx.catalog.find_available_name(idx_name)
4283        } else {
4284            idx_name
4285        }
4286    };
4287
4288    // Check for an object in the catalog with this same name
4289    let full_name = scx.catalog.resolve_full_name(&index_name);
4290    let partial_name = PartialItemName::from(full_name.clone());
4291    // For PostgreSQL compatibility, we need to prevent creating indexes when
4292    // there is an existing object *or* type of the same name.
4293    //
4294    // Technically, we only need to prevent coexistence of indexes and types
4295    // that have an associated relation (record types but not list/map types).
4296    // Enforcing that would be more complicated, though. It's backwards
4297    // compatible to weaken this restriction in the future.
4298    if let (Ok(item), false, false) = (
4299        scx.catalog.resolve_item_or_type(&partial_name),
4300        *if_not_exists,
4301        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4302    ) {
4303        return Err(PlanError::ItemAlreadyExists {
4304            name: full_name.to_string(),
4305            item_type: item.item_type(),
4306        });
4307    }
4308
4309    let options = plan_index_options(scx, with_options.clone())?;
4310    let cluster_id = match in_cluster {
4311        None => scx.resolve_cluster(None)?.id(),
4312        Some(in_cluster) => in_cluster.id,
4313    };
4314
4315    *in_cluster = Some(ResolvedClusterName {
4316        id: cluster_id,
4317        print_name: None,
4318    });
4319
4320    // Normalize `stmt`.
4321    *name = Some(Ident::new(index_name.item.clone())?);
4322    *key_parts = Some(filled_key_parts);
4323    let if_not_exists = *if_not_exists;
4324
4325    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4326    let compaction_window = options.iter().find_map(|o| {
4327        #[allow(irrefutable_let_patterns)]
4328        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4329            Some(lcw.clone())
4330        } else {
4331            None
4332        }
4333    });
4334
4335    Ok(Plan::CreateIndex(CreateIndexPlan {
4336        name: index_name,
4337        index: Index {
4338            create_sql,
4339            on: on.global_id(),
4340            keys,
4341            cluster_id,
4342            compaction_window,
4343        },
4344        if_not_exists,
4345    }))
4346}
4347
4348pub fn describe_create_type(
4349    _: &StatementContext,
4350    _: CreateTypeStatement<Aug>,
4351) -> Result<StatementDesc, PlanError> {
4352    Ok(StatementDesc::new(None))
4353}
4354
4355pub fn plan_create_type(
4356    scx: &StatementContext,
4357    stmt: CreateTypeStatement<Aug>,
4358) -> Result<Plan, PlanError> {
4359    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4360    let CreateTypeStatement { name, as_type, .. } = stmt;
4361
4362    fn validate_data_type(
4363        scx: &StatementContext,
4364        data_type: ResolvedDataType,
4365        as_type: &str,
4366        key: &str,
4367    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4368        let (id, modifiers) = match data_type {
4369            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4370            _ => sql_bail!(
4371                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4372                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4373                as_type,
4374                key,
4375                data_type.human_readable_name(),
4376            ),
4377        };
4378
4379        let item = scx.catalog.get_item(&id);
4380        match item.type_details() {
4381            None => sql_bail!(
4382                "{} must be of class type, but received {} which is of class {}",
4383                key,
4384                scx.catalog.resolve_full_name(item.name()),
4385                item.item_type()
4386            ),
4387            Some(CatalogTypeDetails {
4388                typ: CatalogType::Char,
4389                ..
4390            }) => {
4391                bail_unsupported!("embedding char type in a list or map")
4392            }
4393            _ => {
4394                // Validate that the modifiers are actually valid.
4395                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4396
4397                Ok((id, modifiers))
4398            }
4399        }
4400    }
4401
4402    let inner = match as_type {
4403        CreateTypeAs::List { options } => {
4404            let CreateTypeListOptionExtracted {
4405                element_type,
4406                seen: _,
4407            } = CreateTypeListOptionExtracted::try_from(options)?;
4408            let element_type =
4409                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4410            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4411            CatalogType::List {
4412                element_reference: id,
4413                element_modifiers: modifiers,
4414            }
4415        }
4416        CreateTypeAs::Map { options } => {
4417            let CreateTypeMapOptionExtracted {
4418                key_type,
4419                value_type,
4420                seen: _,
4421            } = CreateTypeMapOptionExtracted::try_from(options)?;
4422            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4423            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4424            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4425            let (value_id, value_modifiers) =
4426                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4427            CatalogType::Map {
4428                key_reference: key_id,
4429                key_modifiers,
4430                value_reference: value_id,
4431                value_modifiers,
4432            }
4433        }
4434        CreateTypeAs::Record { column_defs } => {
4435            let mut fields = vec![];
4436            for column_def in column_defs {
4437                let data_type = column_def.data_type;
4438                let key = ident(column_def.name.clone());
4439                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4440                fields.push(CatalogRecordField {
4441                    name: ColumnName::from(key.clone()),
4442                    type_reference: id,
4443                    type_modifiers: modifiers,
4444                });
4445            }
4446            CatalogType::Record { fields }
4447        }
4448    };
4449
4450    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4451
4452    // Check for an object in the catalog with this same name
4453    let full_name = scx.catalog.resolve_full_name(&name);
4454    let partial_name = PartialItemName::from(full_name.clone());
4455    // For PostgreSQL compatibility, we need to prevent creating types when
4456    // there is an existing object *or* type of the same name.
4457    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4458        if item.item_type().conflicts_with_type() {
4459            return Err(PlanError::ItemAlreadyExists {
4460                name: full_name.to_string(),
4461                item_type: item.item_type(),
4462            });
4463        }
4464    }
4465
4466    Ok(Plan::CreateType(CreateTypePlan {
4467        name,
4468        typ: Type { create_sql, inner },
4469    }))
4470}
4471
4472generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4473
4474generate_extracted_config!(
4475    CreateTypeMapOption,
4476    (KeyType, ResolvedDataType),
4477    (ValueType, ResolvedDataType)
4478);
4479
4480#[derive(Debug)]
4481pub enum PlannedAlterRoleOption {
4482    Attributes(PlannedRoleAttributes),
4483    Variable(PlannedRoleVariable),
4484}
4485
4486#[derive(Debug, Clone)]
4487pub struct PlannedRoleAttributes {
4488    pub inherit: Option<bool>,
4489    pub password: Option<Password>,
4490    pub scram_iterations: Option<NonZeroU32>,
4491    /// `nopassword` is set to true if the password is from the parser is None.
4492    /// This is semantically different than not supplying a password at all,
4493    /// to allow for unsetting a password.
4494    pub nopassword: Option<bool>,
4495    pub superuser: Option<bool>,
4496    pub login: Option<bool>,
4497}
4498
4499fn plan_role_attributes(
4500    options: Vec<RoleAttribute>,
4501    scx: &StatementContext,
4502) -> Result<PlannedRoleAttributes, PlanError> {
4503    let mut planned_attributes = PlannedRoleAttributes {
4504        inherit: None,
4505        password: None,
4506        scram_iterations: None,
4507        superuser: None,
4508        login: None,
4509        nopassword: None,
4510    };
4511
4512    for option in options {
4513        match option {
4514            RoleAttribute::Inherit | RoleAttribute::NoInherit
4515                if planned_attributes.inherit.is_some() =>
4516            {
4517                sql_bail!("conflicting or redundant options");
4518            }
4519            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4520                bail_never_supported!(
4521                    "CREATECLUSTER attribute",
4522                    "sql/create-role/#details",
4523                    "Use system privileges instead."
4524                );
4525            }
4526            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4527                bail_never_supported!(
4528                    "CREATEDB attribute",
4529                    "sql/create-role/#details",
4530                    "Use system privileges instead."
4531                );
4532            }
4533            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4534                bail_never_supported!(
4535                    "CREATEROLE attribute",
4536                    "sql/create-role/#details",
4537                    "Use system privileges instead."
4538                );
4539            }
4540            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4541                sql_bail!("conflicting or redundant options");
4542            }
4543
4544            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4545            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4546            RoleAttribute::Password(password) => {
4547                if let Some(password) = password {
4548                    planned_attributes.password = Some(password.into());
4549                    planned_attributes.scram_iterations =
4550                        Some(scx.catalog.system_vars().scram_iterations())
4551                } else {
4552                    planned_attributes.nopassword = Some(true);
4553                }
4554            }
4555            RoleAttribute::SuperUser => {
4556                if planned_attributes.superuser == Some(false) {
4557                    sql_bail!("conflicting or redundant options");
4558                }
4559                planned_attributes.superuser = Some(true);
4560            }
4561            RoleAttribute::NoSuperUser => {
4562                if planned_attributes.superuser == Some(true) {
4563                    sql_bail!("conflicting or redundant options");
4564                }
4565                planned_attributes.superuser = Some(false);
4566            }
4567            RoleAttribute::Login => {
4568                if planned_attributes.login == Some(false) {
4569                    sql_bail!("conflicting or redundant options");
4570                }
4571                planned_attributes.login = Some(true);
4572            }
4573            RoleAttribute::NoLogin => {
4574                if planned_attributes.login == Some(true) {
4575                    sql_bail!("conflicting or redundant options");
4576                }
4577                planned_attributes.login = Some(false);
4578            }
4579        }
4580    }
4581    if planned_attributes.inherit == Some(false) {
4582        bail_unsupported!("non inherit roles");
4583    }
4584
4585    Ok(planned_attributes)
4586}
4587
4588#[derive(Debug)]
4589pub enum PlannedRoleVariable {
4590    Set { name: String, value: VariableValue },
4591    Reset { name: String },
4592}
4593
4594impl PlannedRoleVariable {
4595    pub fn name(&self) -> &str {
4596        match self {
4597            PlannedRoleVariable::Set { name, .. } => name,
4598            PlannedRoleVariable::Reset { name } => name,
4599        }
4600    }
4601}
4602
4603fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4604    let plan = match variable {
4605        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4606            name: name.to_string(),
4607            value: scl::plan_set_variable_to(value)?,
4608        },
4609        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4610            name: name.to_string(),
4611        },
4612    };
4613    Ok(plan)
4614}
4615
4616pub fn describe_create_role(
4617    _: &StatementContext,
4618    _: CreateRoleStatement,
4619) -> Result<StatementDesc, PlanError> {
4620    Ok(StatementDesc::new(None))
4621}
4622
4623pub fn plan_create_role(
4624    scx: &StatementContext,
4625    CreateRoleStatement { name, options }: CreateRoleStatement,
4626) -> Result<Plan, PlanError> {
4627    let attributes = plan_role_attributes(options, scx)?;
4628    Ok(Plan::CreateRole(CreateRolePlan {
4629        name: normalize::ident(name),
4630        attributes: attributes.into(),
4631    }))
4632}
4633
4634pub fn plan_create_network_policy(
4635    ctx: &StatementContext,
4636    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4637) -> Result<Plan, PlanError> {
4638    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4639    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4640
4641    let Some(rule_defs) = policy_options.rules else {
4642        sql_bail!("RULES must be specified when creating network policies.");
4643    };
4644
4645    let mut rules = vec![];
4646    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4647        let NetworkPolicyRuleOptionExtracted {
4648            seen: _,
4649            direction,
4650            action,
4651            address,
4652        } = options.try_into()?;
4653        let (direction, action, address) = match (direction, action, address) {
4654            (Some(direction), Some(action), Some(address)) => (
4655                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4656                NetworkPolicyRuleAction::try_from(action.as_str())?,
4657                PolicyAddress::try_from(address.as_str())?,
4658            ),
4659            (_, _, _) => {
4660                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4661            }
4662        };
4663        rules.push(NetworkPolicyRule {
4664            name: normalize::ident(name),
4665            direction,
4666            action,
4667            address,
4668        });
4669    }
4670
4671    if rules.len()
4672        > ctx
4673            .catalog
4674            .system_vars()
4675            .max_rules_per_network_policy()
4676            .try_into()?
4677    {
4678        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4679    }
4680
4681    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4682        name: normalize::ident(name),
4683        rules,
4684    }))
4685}
4686
4687pub fn plan_alter_network_policy(
4688    ctx: &StatementContext,
4689    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4690) -> Result<Plan, PlanError> {
4691    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4692
4693    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4694    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4695
4696    let Some(rule_defs) = policy_options.rules else {
4697        sql_bail!("RULES must be specified when creating network policies.");
4698    };
4699
4700    let mut rules = vec![];
4701    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4702        let NetworkPolicyRuleOptionExtracted {
4703            seen: _,
4704            direction,
4705            action,
4706            address,
4707        } = options.try_into()?;
4708
4709        let (direction, action, address) = match (direction, action, address) {
4710            (Some(direction), Some(action), Some(address)) => (
4711                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4712                NetworkPolicyRuleAction::try_from(action.as_str())?,
4713                PolicyAddress::try_from(address.as_str())?,
4714            ),
4715            (_, _, _) => {
4716                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4717            }
4718        };
4719        rules.push(NetworkPolicyRule {
4720            name: normalize::ident(name),
4721            direction,
4722            action,
4723            address,
4724        });
4725    }
4726    if rules.len()
4727        > ctx
4728            .catalog
4729            .system_vars()
4730            .max_rules_per_network_policy()
4731            .try_into()?
4732    {
4733        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4734    }
4735
4736    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4737        id: policy.id(),
4738        name: normalize::ident(name),
4739        rules,
4740    }))
4741}
4742
4743pub fn describe_create_cluster(
4744    _: &StatementContext,
4745    _: CreateClusterStatement<Aug>,
4746) -> Result<StatementDesc, PlanError> {
4747    Ok(StatementDesc::new(None))
4748}
4749
4750// WARNING:
4751// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4752// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4753// that option stays the same. If you were to give a default value here, then not giving that option
4754// to ALTER CLUSTER would always reset the value of that option to the default.
4755generate_extracted_config!(
4756    ClusterOption,
4757    (AvailabilityZones, Vec<String>),
4758    (Disk, bool),
4759    (IntrospectionDebugging, bool),
4760    (IntrospectionInterval, OptionalDuration),
4761    (Managed, bool),
4762    (Replicas, Vec<ReplicaDefinition<Aug>>),
4763    (ReplicationFactor, u32),
4764    (Size, String),
4765    (Schedule, ClusterScheduleOptionValue),
4766    (WorkloadClass, OptionalString)
4767);
4768
4769generate_extracted_config!(
4770    NetworkPolicyOption,
4771    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4772);
4773
4774generate_extracted_config!(
4775    NetworkPolicyRuleOption,
4776    (Direction, String),
4777    (Action, String),
4778    (Address, String)
4779);
4780
4781generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4782
4783generate_extracted_config!(
4784    ClusterAlterUntilReadyOption,
4785    (Timeout, Duration),
4786    (OnTimeout, String)
4787);
4788
4789generate_extracted_config!(
4790    ClusterFeature,
4791    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4792    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4793    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4794    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4795    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4796    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4797    (
4798        EnableProjectionPushdownAfterRelationCse,
4799        Option<bool>,
4800        Default(None)
4801    )
4802);
4803
4804/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4805///
4806/// The reverse of [`unplan_create_cluster`].
4807pub fn plan_create_cluster(
4808    scx: &StatementContext,
4809    stmt: CreateClusterStatement<Aug>,
4810) -> Result<Plan, PlanError> {
4811    let plan = plan_create_cluster_inner(scx, stmt)?;
4812
4813    // Roundtrip through unplan and make sure that we end up with the same plan.
4814    if let CreateClusterVariant::Managed(_) = &plan.variant {
4815        let stmt = unplan_create_cluster(scx, plan.clone())
4816            .map_err(|e| PlanError::Replan(e.to_string()))?;
4817        let create_sql = stmt.to_ast_string_stable();
4818        let stmt = parse::parse(&create_sql)
4819            .map_err(|e| PlanError::Replan(e.to_string()))?
4820            .into_element()
4821            .ast;
4822        let (stmt, _resolved_ids) =
4823            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4824        let stmt = match stmt {
4825            Statement::CreateCluster(stmt) => stmt,
4826            stmt => {
4827                return Err(PlanError::Replan(format!(
4828                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4829                )));
4830            }
4831        };
4832        let replan =
4833            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4834        if plan != replan {
4835            return Err(PlanError::Replan(format!(
4836                "replan does not match: plan={plan:?}, replan={replan:?}"
4837            )));
4838        }
4839    }
4840
4841    Ok(Plan::CreateCluster(plan))
4842}
4843
4844pub fn plan_create_cluster_inner(
4845    scx: &StatementContext,
4846    CreateClusterStatement {
4847        name,
4848        options,
4849        features,
4850    }: CreateClusterStatement<Aug>,
4851) -> Result<CreateClusterPlan, PlanError> {
4852    let ClusterOptionExtracted {
4853        availability_zones,
4854        introspection_debugging,
4855        introspection_interval,
4856        managed,
4857        replicas,
4858        replication_factor,
4859        seen: _,
4860        size,
4861        disk,
4862        schedule,
4863        workload_class,
4864    }: ClusterOptionExtracted = options.try_into()?;
4865
4866    let managed = managed.unwrap_or_else(|| replicas.is_none());
4867
4868    if !scx.catalog.active_role_id().is_system() {
4869        if !features.is_empty() {
4870            sql_bail!("FEATURES not supported for non-system users");
4871        }
4872        if workload_class.is_some() {
4873            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4874        }
4875    }
4876
4877    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4878    let workload_class = workload_class.and_then(|v| v.0);
4879
4880    if managed {
4881        if replicas.is_some() {
4882            sql_bail!("REPLICAS not supported for managed clusters");
4883        }
4884        let Some(size) = size else {
4885            sql_bail!("SIZE must be specified for managed clusters");
4886        };
4887
4888        if disk.is_some() {
4889            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4890            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4891            // we'll be able to remove the `DISK` option entirely.
4892            if scx.catalog.is_cluster_size_cc(&size) {
4893                sql_bail!(
4894                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4895                );
4896            }
4897
4898            scx.catalog
4899                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4900        }
4901
4902        let compute = plan_compute_replica_config(
4903            introspection_interval,
4904            introspection_debugging.unwrap_or(false),
4905        )?;
4906
4907        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4908            replication_factor.unwrap_or_else(|| {
4909                scx.catalog
4910                    .system_vars()
4911                    .default_cluster_replication_factor()
4912            })
4913        } else {
4914            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4915            if replication_factor.is_some() {
4916                sql_bail!(
4917                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4918                );
4919            }
4920            // If we have a non-trivial schedule, then let's not have any replicas initially,
4921            // to avoid quickly going back and forth if the schedule doesn't want a replica
4922            // initially.
4923            0
4924        };
4925        let availability_zones = availability_zones.unwrap_or_default();
4926
4927        if !availability_zones.is_empty() {
4928            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4929        }
4930
4931        // Plan OptimizerFeatureOverrides.
4932        let ClusterFeatureExtracted {
4933            reoptimize_imported_views,
4934            enable_eager_delta_joins,
4935            enable_new_outer_join_lowering,
4936            enable_variadic_left_join_lowering,
4937            enable_letrec_fixpoint_analysis,
4938            enable_join_prioritize_arranged,
4939            enable_projection_pushdown_after_relation_cse,
4940            seen: _,
4941        } = ClusterFeatureExtracted::try_from(features)?;
4942        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4943            reoptimize_imported_views,
4944            enable_eager_delta_joins,
4945            enable_new_outer_join_lowering,
4946            enable_variadic_left_join_lowering,
4947            enable_letrec_fixpoint_analysis,
4948            enable_join_prioritize_arranged,
4949            enable_projection_pushdown_after_relation_cse,
4950            ..Default::default()
4951        };
4952
4953        let schedule = plan_cluster_schedule(schedule)?;
4954
4955        Ok(CreateClusterPlan {
4956            name: normalize::ident(name),
4957            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4958                replication_factor,
4959                size,
4960                availability_zones,
4961                compute,
4962                optimizer_feature_overrides,
4963                schedule,
4964            }),
4965            workload_class,
4966        })
4967    } else {
4968        let Some(replica_defs) = replicas else {
4969            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4970        };
4971        if availability_zones.is_some() {
4972            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4973        }
4974        if replication_factor.is_some() {
4975            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4976        }
4977        if introspection_debugging.is_some() {
4978            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4979        }
4980        if introspection_interval.is_some() {
4981            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4982        }
4983        if size.is_some() {
4984            sql_bail!("SIZE not supported for unmanaged clusters");
4985        }
4986        if disk.is_some() {
4987            sql_bail!("DISK not supported for unmanaged clusters");
4988        }
4989        if !features.is_empty() {
4990            sql_bail!("FEATURES not supported for unmanaged clusters");
4991        }
4992        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4993            sql_bail!(
4994                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4995            );
4996        }
4997
4998        let mut replicas = vec![];
4999        for ReplicaDefinition { name, options } in replica_defs {
5000            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
5001        }
5002
5003        Ok(CreateClusterPlan {
5004            name: normalize::ident(name),
5005            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
5006            workload_class,
5007        })
5008    }
5009}
5010
5011/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
5012///
5013/// The reverse of [`plan_create_cluster`].
5014pub fn unplan_create_cluster(
5015    scx: &StatementContext,
5016    CreateClusterPlan {
5017        name,
5018        variant,
5019        workload_class,
5020    }: CreateClusterPlan,
5021) -> Result<CreateClusterStatement<Aug>, PlanError> {
5022    match variant {
5023        CreateClusterVariant::Managed(CreateClusterManagedPlan {
5024            replication_factor,
5025            size,
5026            availability_zones,
5027            compute,
5028            optimizer_feature_overrides,
5029            schedule,
5030        }) => {
5031            let schedule = unplan_cluster_schedule(schedule);
5032            let OptimizerFeatureOverrides {
5033                enable_guard_subquery_tablefunc: _,
5034                enable_consolidate_after_union_negate: _,
5035                enable_reduce_mfp_fusion: _,
5036                enable_cardinality_estimates: _,
5037                persist_fast_path_limit: _,
5038                reoptimize_imported_views,
5039                enable_eager_delta_joins,
5040                enable_new_outer_join_lowering,
5041                enable_variadic_left_join_lowering,
5042                enable_letrec_fixpoint_analysis,
5043                enable_join_prioritize_arranged,
5044                enable_projection_pushdown_after_relation_cse,
5045                enable_less_reduce_in_eqprop: _,
5046                enable_dequadratic_eqprop_map: _,
5047                enable_eq_classes_withholding_errors: _,
5048                enable_fast_path_plan_insights: _,
5049                enable_cast_elimination: _,
5050            } = optimizer_feature_overrides;
5051            // The ones from above that don't occur below are not wired up to cluster features.
5052            let features_extracted = ClusterFeatureExtracted {
5053                // Seen is ignored when unplanning.
5054                seen: Default::default(),
5055                reoptimize_imported_views,
5056                enable_eager_delta_joins,
5057                enable_new_outer_join_lowering,
5058                enable_variadic_left_join_lowering,
5059                enable_letrec_fixpoint_analysis,
5060                enable_join_prioritize_arranged,
5061                enable_projection_pushdown_after_relation_cse,
5062            };
5063            let features = features_extracted.into_values(scx.catalog);
5064            let availability_zones = if availability_zones.is_empty() {
5065                None
5066            } else {
5067                Some(availability_zones)
5068            };
5069            let (introspection_interval, introspection_debugging) =
5070                unplan_compute_replica_config(compute);
5071            // Replication factor cannot be explicitly specified with a refresh schedule, it's
5072            // always 1 or less.
5073            let replication_factor = match &schedule {
5074                ClusterScheduleOptionValue::Manual => Some(replication_factor),
5075                ClusterScheduleOptionValue::Refresh { .. } => {
5076                    assert!(
5077                        replication_factor <= 1,
5078                        "replication factor, {replication_factor:?}, must be <= 1"
5079                    );
5080                    None
5081                }
5082            };
5083            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5084            let options_extracted = ClusterOptionExtracted {
5085                // Seen is ignored when unplanning.
5086                seen: Default::default(),
5087                availability_zones,
5088                disk: None,
5089                introspection_debugging: Some(introspection_debugging),
5090                introspection_interval,
5091                managed: Some(true),
5092                replicas: None,
5093                replication_factor,
5094                size: Some(size),
5095                schedule: Some(schedule),
5096                workload_class,
5097            };
5098            let options = options_extracted.into_values(scx.catalog);
5099            let name = Ident::new_unchecked(name);
5100            Ok(CreateClusterStatement {
5101                name,
5102                options,
5103                features,
5104            })
5105        }
5106        CreateClusterVariant::Unmanaged(_) => {
5107            bail_unsupported!("SHOW CREATE for unmanaged clusters")
5108        }
5109    }
5110}
5111
5112generate_extracted_config!(
5113    ReplicaOption,
5114    (AvailabilityZone, String),
5115    (BilledAs, String),
5116    (ComputeAddresses, Vec<String>),
5117    (ComputectlAddresses, Vec<String>),
5118    (Disk, bool),
5119    (Internal, bool, Default(false)),
5120    (IntrospectionDebugging, bool, Default(false)),
5121    (IntrospectionInterval, OptionalDuration),
5122    (Size, String),
5123    (StorageAddresses, Vec<String>),
5124    (StoragectlAddresses, Vec<String>),
5125    (Workers, u16)
5126);
5127
5128fn plan_replica_config(
5129    scx: &StatementContext,
5130    options: Vec<ReplicaOption<Aug>>,
5131) -> Result<ReplicaConfig, PlanError> {
5132    let ReplicaOptionExtracted {
5133        availability_zone,
5134        billed_as,
5135        computectl_addresses,
5136        disk,
5137        internal,
5138        introspection_debugging,
5139        introspection_interval,
5140        size,
5141        storagectl_addresses,
5142        ..
5143    }: ReplicaOptionExtracted = options.try_into()?;
5144
5145    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5146
5147    match (
5148        size,
5149        availability_zone,
5150        billed_as,
5151        storagectl_addresses,
5152        computectl_addresses,
5153    ) {
5154        // Common cases we expect end users to hit.
5155        (None, _, None, None, None) => {
5156            // We don't mention the unmanaged options in the error message
5157            // because they are only available in unsafe mode.
5158            sql_bail!("SIZE option must be specified");
5159        }
5160        (Some(size), availability_zone, billed_as, None, None) => {
5161            if disk.is_some() {
5162                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5163                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5164                // we'll be able to remove the `DISK` option entirely.
5165                if scx.catalog.is_cluster_size_cc(&size) {
5166                    sql_bail!(
5167                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5168                    );
5169                }
5170
5171                scx.catalog
5172                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5173            }
5174
5175            Ok(ReplicaConfig::Orchestrated {
5176                size,
5177                availability_zone,
5178                compute,
5179                billed_as,
5180                internal,
5181            })
5182        }
5183
5184        (None, None, None, storagectl_addresses, computectl_addresses) => {
5185            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5186
5187            // When manually testing Materialize in unsafe mode, it's easy to
5188            // accidentally omit one of these options, so we try to produce
5189            // helpful error messages.
5190            let Some(storagectl_addrs) = storagectl_addresses else {
5191                sql_bail!("missing STORAGECTL ADDRESSES option");
5192            };
5193            let Some(computectl_addrs) = computectl_addresses else {
5194                sql_bail!("missing COMPUTECTL ADDRESSES option");
5195            };
5196
5197            if storagectl_addrs.len() != computectl_addrs.len() {
5198                sql_bail!(
5199                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5200                );
5201            }
5202
5203            if disk.is_some() {
5204                sql_bail!("DISK can't be specified for unorchestrated clusters");
5205            }
5206
5207            Ok(ReplicaConfig::Unorchestrated {
5208                storagectl_addrs,
5209                computectl_addrs,
5210                compute,
5211            })
5212        }
5213        _ => {
5214            // We don't bother trying to produce a more helpful error message
5215            // here because no user is likely to hit this path.
5216            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5217        }
5218    }
5219}
5220
5221/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5222///
5223/// The reverse of [`unplan_compute_replica_config`].
5224fn plan_compute_replica_config(
5225    introspection_interval: Option<OptionalDuration>,
5226    introspection_debugging: bool,
5227) -> Result<ComputeReplicaConfig, PlanError> {
5228    let introspection_interval = introspection_interval
5229        .map(|OptionalDuration(i)| i)
5230        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5231    let introspection = match introspection_interval {
5232        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5233            interval,
5234            debugging: introspection_debugging,
5235        }),
5236        None if introspection_debugging => {
5237            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5238        }
5239        None => None,
5240    };
5241    let compute = ComputeReplicaConfig { introspection };
5242    Ok(compute)
5243}
5244
5245/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5246///
5247/// The reverse of [`plan_compute_replica_config`].
5248fn unplan_compute_replica_config(
5249    compute_replica_config: ComputeReplicaConfig,
5250) -> (Option<OptionalDuration>, bool) {
5251    match compute_replica_config.introspection {
5252        Some(ComputeReplicaIntrospectionConfig {
5253            debugging,
5254            interval,
5255        }) => (Some(OptionalDuration(Some(interval))), debugging),
5256        None => (Some(OptionalDuration(None)), false),
5257    }
5258}
5259
5260/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5261///
5262/// The reverse of [`unplan_cluster_schedule`].
5263fn plan_cluster_schedule(
5264    schedule: ClusterScheduleOptionValue,
5265) -> Result<ClusterSchedule, PlanError> {
5266    Ok(match schedule {
5267        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5268        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5269        ClusterScheduleOptionValue::Refresh {
5270            hydration_time_estimate: None,
5271        } => ClusterSchedule::Refresh {
5272            hydration_time_estimate: Duration::from_millis(0),
5273        },
5274        // Otherwise we convert the `IntervalValue` to a `Duration`.
5275        ClusterScheduleOptionValue::Refresh {
5276            hydration_time_estimate: Some(interval_value),
5277        } => {
5278            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5279            if interval.as_microseconds() < 0 {
5280                sql_bail!(
5281                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5282                    interval
5283                );
5284            }
5285            if interval.months != 0 {
5286                // This limitation is because we want this interval to be cleanly convertable
5287                // to a unix epoch timestamp difference. When the interval involves months, then
5288                // this is not true anymore, because months have variable lengths.
5289                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5290            }
5291            let duration = interval.duration()?;
5292            if u64::try_from(duration.as_millis()).is_err()
5293                || Interval::from_duration(&duration).is_err()
5294            {
5295                sql_bail!("HYDRATION TIME ESTIMATE too large");
5296            }
5297            ClusterSchedule::Refresh {
5298                hydration_time_estimate: duration,
5299            }
5300        }
5301    })
5302}
5303
5304/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5305///
5306/// The reverse of [`plan_cluster_schedule`].
5307fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5308    match schedule {
5309        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5310        ClusterSchedule::Refresh {
5311            hydration_time_estimate,
5312        } => {
5313            let interval = Interval::from_duration(&hydration_time_estimate)
5314                .expect("planning ensured that this is convertible back to Interval");
5315            let interval_value = literal::unplan_interval(&interval);
5316            ClusterScheduleOptionValue::Refresh {
5317                hydration_time_estimate: Some(interval_value),
5318            }
5319        }
5320    }
5321}
5322
5323pub fn describe_create_cluster_replica(
5324    _: &StatementContext,
5325    _: CreateClusterReplicaStatement<Aug>,
5326) -> Result<StatementDesc, PlanError> {
5327    Ok(StatementDesc::new(None))
5328}
5329
5330pub fn plan_create_cluster_replica(
5331    scx: &StatementContext,
5332    CreateClusterReplicaStatement {
5333        definition: ReplicaDefinition { name, options },
5334        of_cluster,
5335    }: CreateClusterReplicaStatement<Aug>,
5336) -> Result<Plan, PlanError> {
5337    let cluster = scx
5338        .catalog
5339        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5340    let current_replica_count = cluster.replica_ids().iter().count();
5341    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5342        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5343        return Err(PlanError::CreateReplicaFailStorageObjects {
5344            current_replica_count,
5345            internal_replica_count,
5346            hypothetical_replica_count: current_replica_count + 1,
5347        });
5348    }
5349
5350    let config = plan_replica_config(scx, options)?;
5351
5352    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5353        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5354            return Err(PlanError::MangedReplicaName(name.into_string()));
5355        }
5356    } else {
5357        ensure_cluster_is_not_managed(scx, cluster.id())?;
5358    }
5359
5360    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5361        name: normalize::ident(name),
5362        cluster_id: cluster.id(),
5363        config,
5364    }))
5365}
5366
5367pub fn describe_create_secret(
5368    _: &StatementContext,
5369    _: CreateSecretStatement<Aug>,
5370) -> Result<StatementDesc, PlanError> {
5371    Ok(StatementDesc::new(None))
5372}
5373
5374pub fn plan_create_secret(
5375    scx: &StatementContext,
5376    stmt: CreateSecretStatement<Aug>,
5377) -> Result<Plan, PlanError> {
5378    let CreateSecretStatement {
5379        name,
5380        if_not_exists,
5381        value,
5382    } = &stmt;
5383
5384    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5385    let mut create_sql_statement = stmt.clone();
5386    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5387    let create_sql =
5388        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5389    let secret_as = query::plan_secret_as(scx, value.clone())?;
5390
5391    let secret = Secret {
5392        create_sql,
5393        secret_as,
5394    };
5395
5396    Ok(Plan::CreateSecret(CreateSecretPlan {
5397        name,
5398        secret,
5399        if_not_exists: *if_not_exists,
5400    }))
5401}
5402
5403pub fn describe_create_connection(
5404    _: &StatementContext,
5405    _: CreateConnectionStatement<Aug>,
5406) -> Result<StatementDesc, PlanError> {
5407    Ok(StatementDesc::new(None))
5408}
5409
5410generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5411
5412pub fn plan_create_connection(
5413    scx: &StatementContext,
5414    mut stmt: CreateConnectionStatement<Aug>,
5415) -> Result<Plan, PlanError> {
5416    let CreateConnectionStatement {
5417        name,
5418        connection_type,
5419        values,
5420        if_not_exists,
5421        with_options,
5422    } = stmt.clone();
5423    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5424    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5425    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5426
5427    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5428    if options.validate.is_some() {
5429        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5430    }
5431    let validate = match options.validate {
5432        Some(val) => val,
5433        None => {
5434            scx.catalog
5435                .system_vars()
5436                .enable_default_connection_validation()
5437                && details.to_connection().validate_by_default()
5438        }
5439    };
5440
5441    // Check for an object in the catalog with this same name
5442    let full_name = scx.catalog.resolve_full_name(&name);
5443    let partial_name = PartialItemName::from(full_name.clone());
5444    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5445        return Err(PlanError::ItemAlreadyExists {
5446            name: full_name.to_string(),
5447            item_type: item.item_type(),
5448        });
5449    }
5450
5451    // For SSH connections, overwrite the public key options based on the
5452    // connection details, in case we generated new keys during planning.
5453    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5454        stmt.values.retain(|v| {
5455            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5456        });
5457        stmt.values.push(ConnectionOption {
5458            name: ConnectionOptionName::PublicKey1,
5459            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5460        });
5461        stmt.values.push(ConnectionOption {
5462            name: ConnectionOptionName::PublicKey2,
5463            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5464        });
5465    }
5466    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5467
5468    let plan = CreateConnectionPlan {
5469        name,
5470        if_not_exists,
5471        connection: crate::plan::Connection {
5472            create_sql,
5473            details,
5474        },
5475        validate,
5476    };
5477    Ok(Plan::CreateConnection(plan))
5478}
5479
5480fn plan_drop_database(
5481    scx: &StatementContext,
5482    if_exists: bool,
5483    name: &UnresolvedDatabaseName,
5484    cascade: bool,
5485) -> Result<Option<DatabaseId>, PlanError> {
5486    Ok(match resolve_database(scx, name, if_exists)? {
5487        Some(database) => {
5488            if !cascade && database.has_schemas() {
5489                sql_bail!(
5490                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5491                    name,
5492                );
5493            }
5494            Some(database.id())
5495        }
5496        None => None,
5497    })
5498}
5499
5500pub fn describe_drop_objects(
5501    _: &StatementContext,
5502    _: DropObjectsStatement,
5503) -> Result<StatementDesc, PlanError> {
5504    Ok(StatementDesc::new(None))
5505}
5506
5507pub fn plan_drop_objects(
5508    scx: &mut StatementContext,
5509    DropObjectsStatement {
5510        object_type,
5511        if_exists,
5512        names,
5513        cascade,
5514    }: DropObjectsStatement,
5515) -> Result<Plan, PlanError> {
5516    assert_ne!(
5517        object_type,
5518        mz_sql_parser::ast::ObjectType::Func,
5519        "rejected in parser"
5520    );
5521    let object_type = object_type.into();
5522
5523    let mut referenced_ids = Vec::new();
5524    for name in names {
5525        let id = match &name {
5526            UnresolvedObjectName::Cluster(name) => {
5527                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5528            }
5529            UnresolvedObjectName::ClusterReplica(name) => {
5530                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5531            }
5532            UnresolvedObjectName::Database(name) => {
5533                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5534            }
5535            UnresolvedObjectName::Schema(name) => {
5536                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5537            }
5538            UnresolvedObjectName::Role(name) => {
5539                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5540            }
5541            UnresolvedObjectName::Item(name) => {
5542                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5543                    .map(ObjectId::Item)
5544            }
5545            UnresolvedObjectName::NetworkPolicy(name) => {
5546                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5547            }
5548        };
5549        match id {
5550            Some(id) => referenced_ids.push(id),
5551            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5552                name: name.to_ast_string_simple(),
5553                object_type,
5554            }),
5555        }
5556    }
5557    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5558
5559    Ok(Plan::DropObjects(DropObjectsPlan {
5560        referenced_ids,
5561        drop_ids,
5562        object_type,
5563    }))
5564}
5565
5566fn plan_drop_schema(
5567    scx: &StatementContext,
5568    if_exists: bool,
5569    name: &UnresolvedSchemaName,
5570    cascade: bool,
5571) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5572    // Special case for mz_temp: with lazy temporary schema creation, the temp
5573    // schema may not exist yet, but we still need to return the correct error.
5574    // Check the schema name directly against MZ_TEMP_SCHEMA.
5575    let normalized = normalize::unresolved_schema_name(name.clone())?;
5576    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5577        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5578    }
5579
5580    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5581        Some((database_spec, schema_spec)) => {
5582            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5583                sql_bail!(
5584                    "cannot drop schema {name} because it is required by the database system",
5585                );
5586            }
5587            if let SchemaSpecifier::Temporary = schema_spec {
5588                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5589            }
5590            let schema = scx.get_schema(&database_spec, &schema_spec);
5591            if !cascade && schema.has_items() {
5592                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5593                sql_bail!(
5594                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5595                    full_schema_name
5596                );
5597            }
5598            Some((database_spec, schema_spec))
5599        }
5600        None => None,
5601    })
5602}
5603
5604fn plan_drop_role(
5605    scx: &StatementContext,
5606    if_exists: bool,
5607    name: &Ident,
5608) -> Result<Option<RoleId>, PlanError> {
5609    match scx.catalog.resolve_role(name.as_str()) {
5610        Ok(role) => {
5611            let id = role.id();
5612            if &id == scx.catalog.active_role_id() {
5613                sql_bail!("current role cannot be dropped");
5614            }
5615            for role in scx.catalog.get_roles() {
5616                for (member_id, grantor_id) in role.membership() {
5617                    if &id == grantor_id {
5618                        let member_role = scx.catalog.get_role(member_id);
5619                        sql_bail!(
5620                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5621                            name.as_str(),
5622                            role.name(),
5623                            member_role.name()
5624                        );
5625                    }
5626                }
5627            }
5628            Ok(Some(role.id()))
5629        }
5630        Err(_) if if_exists => Ok(None),
5631        Err(e) => Err(e.into()),
5632    }
5633}
5634
5635fn plan_drop_cluster(
5636    scx: &StatementContext,
5637    if_exists: bool,
5638    name: &Ident,
5639    cascade: bool,
5640) -> Result<Option<ClusterId>, PlanError> {
5641    Ok(match resolve_cluster(scx, name, if_exists)? {
5642        Some(cluster) => {
5643            if !cascade && !cluster.bound_objects().is_empty() {
5644                return Err(PlanError::DependentObjectsStillExist {
5645                    object_type: "cluster".to_string(),
5646                    object_name: cluster.name().to_string(),
5647                    dependents: Vec::new(),
5648                });
5649            }
5650            Some(cluster.id())
5651        }
5652        None => None,
5653    })
5654}
5655
5656fn plan_drop_network_policy(
5657    scx: &StatementContext,
5658    if_exists: bool,
5659    name: &Ident,
5660) -> Result<Option<NetworkPolicyId>, PlanError> {
5661    match scx.catalog.resolve_network_policy(name.as_str()) {
5662        Ok(policy) => {
5663            // TODO(network_policy): When we support role based network policies, check if any role
5664            // currently has the specified policy set.
5665            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5666                Err(PlanError::NetworkPolicyInUse)
5667            } else {
5668                Ok(Some(policy.id()))
5669            }
5670        }
5671        Err(_) if if_exists => Ok(None),
5672        Err(e) => Err(e.into()),
5673    }
5674}
5675
5676/// Returns `true` if the cluster has any object that requires a single replica.
5677/// Returns `false` if the cluster has no objects.
5678fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5679    // If this feature is enabled then all objects support multiple-replicas
5680    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5681        false
5682    } else {
5683        // Othewise we check for the existence of sources or sinks
5684        cluster.bound_objects().iter().any(|id| {
5685            let item = scx.catalog.get_item(id);
5686            matches!(
5687                item.item_type(),
5688                CatalogItemType::Sink | CatalogItemType::Source
5689            )
5690        })
5691    }
5692}
5693
5694fn plan_drop_cluster_replica(
5695    scx: &StatementContext,
5696    if_exists: bool,
5697    name: &QualifiedReplica,
5698) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5699    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5700    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5701}
5702
5703/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5704fn plan_drop_item(
5705    scx: &StatementContext,
5706    object_type: ObjectType,
5707    if_exists: bool,
5708    name: UnresolvedItemName,
5709    cascade: bool,
5710) -> Result<Option<CatalogItemId>, PlanError> {
5711    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5712        Ok(r) => r,
5713        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5714        Err(PlanError::MismatchedObjectType {
5715            name,
5716            is_type: ObjectType::MaterializedView,
5717            expected_type: ObjectType::View,
5718        }) => {
5719            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5720        }
5721        e => e?,
5722    };
5723
5724    Ok(match resolved {
5725        Some(catalog_item) => {
5726            if catalog_item.id().is_system() {
5727                sql_bail!(
5728                    "cannot drop {} {} because it is required by the database system",
5729                    catalog_item.item_type(),
5730                    scx.catalog.minimal_qualification(catalog_item.name()),
5731                );
5732            }
5733
5734            if !cascade {
5735                for id in catalog_item.used_by() {
5736                    let dep = scx.catalog.get_item(id);
5737                    if dependency_prevents_drop(object_type, dep) {
5738                        return Err(PlanError::DependentObjectsStillExist {
5739                            object_type: catalog_item.item_type().to_string(),
5740                            object_name: scx
5741                                .catalog
5742                                .minimal_qualification(catalog_item.name())
5743                                .to_string(),
5744                            dependents: vec![(
5745                                dep.item_type().to_string(),
5746                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5747                            )],
5748                        });
5749                    }
5750                }
5751                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5752                //  relies on entry. Unfortunately, we don't have that information readily available.
5753            }
5754            Some(catalog_item.id())
5755        }
5756        None => None,
5757    })
5758}
5759
5760/// Does the dependency `dep` prevent a drop of a non-cascade query?
5761fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5762    match object_type {
5763        ObjectType::Type => true,
5764        _ => match dep.item_type() {
5765            CatalogItemType::Func
5766            | CatalogItemType::Table
5767            | CatalogItemType::Source
5768            | CatalogItemType::View
5769            | CatalogItemType::MaterializedView
5770            | CatalogItemType::Sink
5771            | CatalogItemType::Type
5772            | CatalogItemType::Secret
5773            | CatalogItemType::Connection
5774            | CatalogItemType::ContinualTask => true,
5775            CatalogItemType::Index => false,
5776        },
5777    }
5778}
5779
5780pub fn describe_alter_index_options(
5781    _: &StatementContext,
5782    _: AlterIndexStatement<Aug>,
5783) -> Result<StatementDesc, PlanError> {
5784    Ok(StatementDesc::new(None))
5785}
5786
5787pub fn describe_drop_owned(
5788    _: &StatementContext,
5789    _: DropOwnedStatement<Aug>,
5790) -> Result<StatementDesc, PlanError> {
5791    Ok(StatementDesc::new(None))
5792}
5793
5794pub fn plan_drop_owned(
5795    scx: &StatementContext,
5796    drop: DropOwnedStatement<Aug>,
5797) -> Result<Plan, PlanError> {
5798    let cascade = drop.cascade();
5799    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5800    let mut drop_ids = Vec::new();
5801    let mut privilege_revokes = Vec::new();
5802    let mut default_privilege_revokes = Vec::new();
5803
5804    fn update_privilege_revokes(
5805        object_id: SystemObjectId,
5806        privileges: &PrivilegeMap,
5807        role_ids: &BTreeSet<RoleId>,
5808        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5809    ) {
5810        privilege_revokes.extend(iter::zip(
5811            iter::repeat(object_id),
5812            privileges
5813                .all_values()
5814                .filter(|privilege| role_ids.contains(&privilege.grantee))
5815                .cloned(),
5816        ));
5817    }
5818
5819    // Replicas
5820    for replica in scx.catalog.get_cluster_replicas() {
5821        if role_ids.contains(&replica.owner_id()) {
5822            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5823        }
5824    }
5825
5826    // Clusters
5827    for cluster in scx.catalog.get_clusters() {
5828        if role_ids.contains(&cluster.owner_id()) {
5829            // Note: CASCADE is not required for replicas.
5830            if !cascade {
5831                let non_owned_bound_objects: Vec<_> = cluster
5832                    .bound_objects()
5833                    .into_iter()
5834                    .map(|item_id| scx.catalog.get_item(item_id))
5835                    .filter(|item| !role_ids.contains(&item.owner_id()))
5836                    .collect();
5837                if !non_owned_bound_objects.is_empty() {
5838                    let names: Vec<_> = non_owned_bound_objects
5839                        .into_iter()
5840                        .map(|item| {
5841                            (
5842                                item.item_type().to_string(),
5843                                scx.catalog.resolve_full_name(item.name()).to_string(),
5844                            )
5845                        })
5846                        .collect();
5847                    return Err(PlanError::DependentObjectsStillExist {
5848                        object_type: "cluster".to_string(),
5849                        object_name: cluster.name().to_string(),
5850                        dependents: names,
5851                    });
5852                }
5853            }
5854            drop_ids.push(cluster.id().into());
5855        }
5856        update_privilege_revokes(
5857            SystemObjectId::Object(cluster.id().into()),
5858            cluster.privileges(),
5859            &role_ids,
5860            &mut privilege_revokes,
5861        );
5862    }
5863
5864    // Items
5865    for item in scx.catalog.get_items() {
5866        if role_ids.contains(&item.owner_id()) {
5867            if !cascade {
5868                // Checks if any items still depend on this one, returning an error if so.
5869                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5870                    let non_owned_dependencies: Vec<_> = used_by
5871                        .into_iter()
5872                        .map(|item_id| scx.catalog.get_item(item_id))
5873                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5874                        .filter(|item| !role_ids.contains(&item.owner_id()))
5875                        .collect();
5876                    if !non_owned_dependencies.is_empty() {
5877                        let names: Vec<_> = non_owned_dependencies
5878                            .into_iter()
5879                            .map(|item| {
5880                                let item_typ = item.item_type().to_string();
5881                                let item_name =
5882                                    scx.catalog.resolve_full_name(item.name()).to_string();
5883                                (item_typ, item_name)
5884                            })
5885                            .collect();
5886                        Err(PlanError::DependentObjectsStillExist {
5887                            object_type: item.item_type().to_string(),
5888                            object_name: scx
5889                                .catalog
5890                                .resolve_full_name(item.name())
5891                                .to_string()
5892                                .to_string(),
5893                            dependents: names,
5894                        })
5895                    } else {
5896                        Ok(())
5897                    }
5898                };
5899
5900                // When this item gets dropped it will also drop its progress source, so we need to
5901                // check the users of those.
5902                if let Some(id) = item.progress_id() {
5903                    let progress_item = scx.catalog.get_item(&id);
5904                    check_if_dependents_exist(progress_item.used_by())?;
5905                }
5906                check_if_dependents_exist(item.used_by())?;
5907            }
5908            drop_ids.push(item.id().into());
5909        }
5910        update_privilege_revokes(
5911            SystemObjectId::Object(item.id().into()),
5912            item.privileges(),
5913            &role_ids,
5914            &mut privilege_revokes,
5915        );
5916    }
5917
5918    // Schemas
5919    for schema in scx.catalog.get_schemas() {
5920        if !schema.id().is_temporary() {
5921            if role_ids.contains(&schema.owner_id()) {
5922                if !cascade {
5923                    let non_owned_dependencies: Vec<_> = schema
5924                        .item_ids()
5925                        .map(|item_id| scx.catalog.get_item(&item_id))
5926                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5927                        .filter(|item| !role_ids.contains(&item.owner_id()))
5928                        .collect();
5929                    if !non_owned_dependencies.is_empty() {
5930                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5931                        sql_bail!(
5932                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5933                            full_schema_name.to_string().quoted()
5934                        );
5935                    }
5936                }
5937                drop_ids.push((*schema.database(), *schema.id()).into())
5938            }
5939            update_privilege_revokes(
5940                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5941                schema.privileges(),
5942                &role_ids,
5943                &mut privilege_revokes,
5944            );
5945        }
5946    }
5947
5948    // Databases
5949    for database in scx.catalog.get_databases() {
5950        if role_ids.contains(&database.owner_id()) {
5951            if !cascade {
5952                let non_owned_schemas: Vec<_> = database
5953                    .schemas()
5954                    .into_iter()
5955                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5956                    .collect();
5957                if !non_owned_schemas.is_empty() {
5958                    sql_bail!(
5959                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5960                        database.name().quoted(),
5961                    );
5962                }
5963            }
5964            drop_ids.push(database.id().into());
5965        }
5966        update_privilege_revokes(
5967            SystemObjectId::Object(database.id().into()),
5968            database.privileges(),
5969            &role_ids,
5970            &mut privilege_revokes,
5971        );
5972    }
5973
5974    // System
5975    update_privilege_revokes(
5976        SystemObjectId::System,
5977        scx.catalog.get_system_privileges(),
5978        &role_ids,
5979        &mut privilege_revokes,
5980    );
5981
5982    for (default_privilege_object, default_privilege_acl_items) in
5983        scx.catalog.get_default_privileges()
5984    {
5985        for default_privilege_acl_item in default_privilege_acl_items {
5986            if role_ids.contains(&default_privilege_object.role_id)
5987                || role_ids.contains(&default_privilege_acl_item.grantee)
5988            {
5989                default_privilege_revokes.push((
5990                    default_privilege_object.clone(),
5991                    default_privilege_acl_item.clone(),
5992                ));
5993            }
5994        }
5995    }
5996
5997    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5998
5999    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
6000    if !system_ids.is_empty() {
6001        let mut owners = system_ids
6002            .into_iter()
6003            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
6004            .collect::<BTreeSet<_>>()
6005            .into_iter()
6006            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
6007        sql_bail!(
6008            "cannot drop objects owned by role {} because they are required by the database system",
6009            owners.join(", "),
6010        );
6011    }
6012
6013    Ok(Plan::DropOwned(DropOwnedPlan {
6014        role_ids: role_ids.into_iter().collect(),
6015        drop_ids,
6016        privilege_revokes,
6017        default_privilege_revokes,
6018    }))
6019}
6020
6021fn plan_retain_history_option(
6022    scx: &StatementContext,
6023    retain_history: Option<OptionalDuration>,
6024) -> Result<Option<CompactionWindow>, PlanError> {
6025    if let Some(OptionalDuration(lcw)) = retain_history {
6026        Ok(Some(plan_retain_history(scx, lcw)?))
6027    } else {
6028        Ok(None)
6029    }
6030}
6031
6032// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
6033// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
6034// already converts the zero duration into `None`. This function must not be called in the `RESET
6035// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
6036// `None`.
6037fn plan_retain_history(
6038    scx: &StatementContext,
6039    lcw: Option<Duration>,
6040) -> Result<CompactionWindow, PlanError> {
6041    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6042    match lcw {
6043        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
6044        // disable compaction), and should never occur here. Furthermore, some things actually do
6045        // break when this is set to real zero:
6046        // https://github.com/MaterializeInc/database-issues/issues/3798.
6047        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6048            option_name: "RETAIN HISTORY".to_string(),
6049            err: Box::new(PlanError::Unstructured(
6050                "internal error: unexpectedly zero".to_string(),
6051            )),
6052        }),
6053        Some(duration) => {
6054            // Error if the duration is low and enable_unlimited_retain_history is not set (which
6055            // should only be possible during testing).
6056            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6057                && scx
6058                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6059                    .is_err()
6060            {
6061                return Err(PlanError::RetainHistoryLow {
6062                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6063                });
6064            }
6065            Ok(duration.try_into()?)
6066        }
6067        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
6068        // to be a bad choice, so prevent it.
6069        None => {
6070            if scx
6071                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6072                .is_err()
6073            {
6074                Err(PlanError::RetainHistoryRequired)
6075            } else {
6076                Ok(CompactionWindow::DisableCompaction)
6077            }
6078        }
6079    }
6080}
6081
6082generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6083
6084fn plan_index_options(
6085    scx: &StatementContext,
6086    with_opts: Vec<IndexOption<Aug>>,
6087) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6088    if !with_opts.is_empty() {
6089        // Index options are not durable.
6090        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6091    }
6092
6093    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6094
6095    let mut out = Vec::with_capacity(1);
6096    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6097        out.push(crate::plan::IndexOption::RetainHistory(cw));
6098    }
6099    Ok(out)
6100}
6101
6102generate_extracted_config!(
6103    TableOption,
6104    (PartitionBy, Vec<Ident>),
6105    (RetainHistory, OptionalDuration),
6106    (RedactedTest, String)
6107);
6108
6109fn plan_table_options(
6110    scx: &StatementContext,
6111    desc: &RelationDesc,
6112    with_opts: Vec<TableOption<Aug>>,
6113) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6114    let TableOptionExtracted {
6115        partition_by,
6116        retain_history,
6117        redacted_test,
6118        ..
6119    }: TableOptionExtracted = with_opts.try_into()?;
6120
6121    if let Some(partition_by) = partition_by {
6122        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6123        check_partition_by(desc, partition_by)?;
6124    }
6125
6126    if redacted_test.is_some() {
6127        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6128    }
6129
6130    let mut out = Vec::with_capacity(1);
6131    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6132        out.push(crate::plan::TableOption::RetainHistory(cw));
6133    }
6134    Ok(out)
6135}
6136
6137pub fn plan_alter_index_options(
6138    scx: &mut StatementContext,
6139    AlterIndexStatement {
6140        index_name,
6141        if_exists,
6142        action,
6143    }: AlterIndexStatement<Aug>,
6144) -> Result<Plan, PlanError> {
6145    let object_type = ObjectType::Index;
6146    match action {
6147        AlterIndexAction::ResetOptions(options) => {
6148            let mut options = options.into_iter();
6149            if let Some(opt) = options.next() {
6150                match opt {
6151                    IndexOptionName::RetainHistory => {
6152                        if options.next().is_some() {
6153                            sql_bail!("RETAIN HISTORY must be only option");
6154                        }
6155                        return alter_retain_history(
6156                            scx,
6157                            object_type,
6158                            if_exists,
6159                            UnresolvedObjectName::Item(index_name),
6160                            None,
6161                        );
6162                    }
6163                }
6164            }
6165            sql_bail!("expected option");
6166        }
6167        AlterIndexAction::SetOptions(options) => {
6168            let mut options = options.into_iter();
6169            if let Some(opt) = options.next() {
6170                match opt.name {
6171                    IndexOptionName::RetainHistory => {
6172                        if options.next().is_some() {
6173                            sql_bail!("RETAIN HISTORY must be only option");
6174                        }
6175                        return alter_retain_history(
6176                            scx,
6177                            object_type,
6178                            if_exists,
6179                            UnresolvedObjectName::Item(index_name),
6180                            opt.value,
6181                        );
6182                    }
6183                }
6184            }
6185            sql_bail!("expected option");
6186        }
6187    }
6188}
6189
6190pub fn describe_alter_cluster_set_options(
6191    _: &StatementContext,
6192    _: AlterClusterStatement<Aug>,
6193) -> Result<StatementDesc, PlanError> {
6194    Ok(StatementDesc::new(None))
6195}
6196
6197pub fn plan_alter_cluster(
6198    scx: &mut StatementContext,
6199    AlterClusterStatement {
6200        name,
6201        action,
6202        if_exists,
6203    }: AlterClusterStatement<Aug>,
6204) -> Result<Plan, PlanError> {
6205    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6206        Some(entry) => entry,
6207        None => {
6208            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6209                name: name.to_ast_string_simple(),
6210                object_type: ObjectType::Cluster,
6211            });
6212
6213            return Ok(Plan::AlterNoop(AlterNoopPlan {
6214                object_type: ObjectType::Cluster,
6215            }));
6216        }
6217    };
6218
6219    let mut options: PlanClusterOption = Default::default();
6220    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6221
6222    match action {
6223        AlterClusterAction::SetOptions {
6224            options: set_options,
6225            with_options,
6226        } => {
6227            let ClusterOptionExtracted {
6228                availability_zones,
6229                introspection_debugging,
6230                introspection_interval,
6231                managed,
6232                replicas: replica_defs,
6233                replication_factor,
6234                seen: _,
6235                size,
6236                disk,
6237                schedule,
6238                workload_class,
6239            }: ClusterOptionExtracted = set_options.try_into()?;
6240
6241            if !scx.catalog.active_role_id().is_system() {
6242                if workload_class.is_some() {
6243                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6244                }
6245            }
6246
6247            match managed.unwrap_or_else(|| cluster.is_managed()) {
6248                true => {
6249                    let alter_strategy_extracted =
6250                        ClusterAlterOptionExtracted::try_from(with_options)?;
6251                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6252
6253                    match alter_strategy {
6254                        AlterClusterPlanStrategy::None => {}
6255                        _ => {
6256                            scx.require_feature_flag(
6257                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6258                            )?;
6259                        }
6260                    }
6261
6262                    if replica_defs.is_some() {
6263                        sql_bail!("REPLICAS not supported for managed clusters");
6264                    }
6265                    if schedule.is_some()
6266                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6267                    {
6268                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6269                    }
6270
6271                    if let Some(replication_factor) = replication_factor {
6272                        if schedule.is_some()
6273                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6274                        {
6275                            sql_bail!(
6276                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6277                            );
6278                        }
6279                        if let Some(current_schedule) = cluster.schedule() {
6280                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6281                                sql_bail!(
6282                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6283                                );
6284                            }
6285                        }
6286
6287                        let internal_replica_count =
6288                            cluster.replicas().iter().filter(|r| r.internal()).count();
6289                        let hypothetical_replica_count =
6290                            internal_replica_count + usize::cast_from(replication_factor);
6291
6292                        // Total number of replicas running is internal replicas
6293                        // + replication factor.
6294                        if contains_single_replica_objects(scx, cluster)
6295                            && hypothetical_replica_count > 1
6296                        {
6297                            return Err(PlanError::CreateReplicaFailStorageObjects {
6298                                current_replica_count: cluster.replica_ids().iter().count(),
6299                                internal_replica_count,
6300                                hypothetical_replica_count,
6301                            });
6302                        }
6303                    } else if alter_strategy.is_some() {
6304                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6305                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6306                        // just fail.
6307                        let internal_replica_count =
6308                            cluster.replicas().iter().filter(|r| r.internal()).count();
6309                        let hypothetical_replica_count = internal_replica_count * 2;
6310                        if contains_single_replica_objects(scx, cluster) {
6311                            return Err(PlanError::CreateReplicaFailStorageObjects {
6312                                current_replica_count: cluster.replica_ids().iter().count(),
6313                                internal_replica_count,
6314                                hypothetical_replica_count,
6315                            });
6316                        }
6317                    }
6318                }
6319                false => {
6320                    if !alter_strategy.is_none() {
6321                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6322                    }
6323                    if availability_zones.is_some() {
6324                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6325                    }
6326                    if replication_factor.is_some() {
6327                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6328                    }
6329                    if introspection_debugging.is_some() {
6330                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6331                    }
6332                    if introspection_interval.is_some() {
6333                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6334                    }
6335                    if size.is_some() {
6336                        sql_bail!("SIZE not supported for unmanaged clusters");
6337                    }
6338                    if disk.is_some() {
6339                        sql_bail!("DISK not supported for unmanaged clusters");
6340                    }
6341                    if schedule.is_some()
6342                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6343                    {
6344                        sql_bail!(
6345                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6346                        );
6347                    }
6348                    if let Some(current_schedule) = cluster.schedule() {
6349                        if !matches!(current_schedule, ClusterSchedule::Manual)
6350                            && schedule.is_none()
6351                        {
6352                            sql_bail!(
6353                                "when switching a cluster to unmanaged, if the managed \
6354                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6355                                explicitly set the SCHEDULE to MANUAL"
6356                            );
6357                        }
6358                    }
6359                }
6360            }
6361
6362            let mut replicas = vec![];
6363            for ReplicaDefinition { name, options } in
6364                replica_defs.into_iter().flat_map(Vec::into_iter)
6365            {
6366                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6367            }
6368
6369            if let Some(managed) = managed {
6370                options.managed = AlterOptionParameter::Set(managed);
6371            }
6372            if let Some(replication_factor) = replication_factor {
6373                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6374            }
6375            if let Some(size) = &size {
6376                options.size = AlterOptionParameter::Set(size.clone());
6377            }
6378            if let Some(availability_zones) = availability_zones {
6379                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6380            }
6381            if let Some(introspection_debugging) = introspection_debugging {
6382                options.introspection_debugging =
6383                    AlterOptionParameter::Set(introspection_debugging);
6384            }
6385            if let Some(introspection_interval) = introspection_interval {
6386                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6387            }
6388            if disk.is_some() {
6389                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6390                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6391                // we'll be able to remove the `DISK` option entirely.
6392                let size = size.as_deref().unwrap_or_else(|| {
6393                    cluster.managed_size().expect("cluster known to be managed")
6394                });
6395                if scx.catalog.is_cluster_size_cc(size) {
6396                    sql_bail!(
6397                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6398                    );
6399                }
6400
6401                scx.catalog
6402                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6403            }
6404            if !replicas.is_empty() {
6405                options.replicas = AlterOptionParameter::Set(replicas);
6406            }
6407            if let Some(schedule) = schedule {
6408                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6409            }
6410            if let Some(workload_class) = workload_class {
6411                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6412            }
6413        }
6414        AlterClusterAction::ResetOptions(reset_options) => {
6415            use AlterOptionParameter::Reset;
6416            use ClusterOptionName::*;
6417
6418            if !scx.catalog.active_role_id().is_system() {
6419                if reset_options.contains(&WorkloadClass) {
6420                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6421                }
6422            }
6423
6424            for option in reset_options {
6425                match option {
6426                    AvailabilityZones => options.availability_zones = Reset,
6427                    Disk => scx
6428                        .catalog
6429                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6430                    IntrospectionInterval => options.introspection_interval = Reset,
6431                    IntrospectionDebugging => options.introspection_debugging = Reset,
6432                    Managed => options.managed = Reset,
6433                    Replicas => options.replicas = Reset,
6434                    ReplicationFactor => options.replication_factor = Reset,
6435                    Size => options.size = Reset,
6436                    Schedule => options.schedule = Reset,
6437                    WorkloadClass => options.workload_class = Reset,
6438                }
6439            }
6440        }
6441    }
6442    Ok(Plan::AlterCluster(AlterClusterPlan {
6443        id: cluster.id(),
6444        name: cluster.name().to_string(),
6445        options,
6446        strategy: alter_strategy,
6447    }))
6448}
6449
6450pub fn describe_alter_set_cluster(
6451    _: &StatementContext,
6452    _: AlterSetClusterStatement<Aug>,
6453) -> Result<StatementDesc, PlanError> {
6454    Ok(StatementDesc::new(None))
6455}
6456
6457pub fn plan_alter_item_set_cluster(
6458    scx: &StatementContext,
6459    AlterSetClusterStatement {
6460        if_exists,
6461        set_cluster: in_cluster_name,
6462        name,
6463        object_type,
6464    }: AlterSetClusterStatement<Aug>,
6465) -> Result<Plan, PlanError> {
6466    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6467
6468    let object_type = object_type.into();
6469
6470    // Prevent access to `SET CLUSTER` for unsupported objects.
6471    match object_type {
6472        ObjectType::MaterializedView => {}
6473        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6474            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6475        }
6476        _ => {
6477            bail_never_supported!(
6478                format!("ALTER {object_type} SET CLUSTER"),
6479                "sql/alter-set-cluster/",
6480                format!("{object_type} has no associated cluster")
6481            )
6482        }
6483    }
6484
6485    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6486
6487    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6488        Some(entry) => {
6489            let current_cluster = entry.cluster_id();
6490            let Some(current_cluster) = current_cluster else {
6491                sql_bail!("No cluster associated with {name}");
6492            };
6493
6494            if current_cluster == in_cluster.id() {
6495                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6496            } else {
6497                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6498                    id: entry.id(),
6499                    set_cluster: in_cluster.id(),
6500                }))
6501            }
6502        }
6503        None => {
6504            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6505                name: name.to_ast_string_simple(),
6506                object_type,
6507            });
6508
6509            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6510        }
6511    }
6512}
6513
6514pub fn describe_alter_object_rename(
6515    _: &StatementContext,
6516    _: AlterObjectRenameStatement,
6517) -> Result<StatementDesc, PlanError> {
6518    Ok(StatementDesc::new(None))
6519}
6520
6521pub fn plan_alter_object_rename(
6522    scx: &mut StatementContext,
6523    AlterObjectRenameStatement {
6524        name,
6525        object_type,
6526        to_item_name,
6527        if_exists,
6528    }: AlterObjectRenameStatement,
6529) -> Result<Plan, PlanError> {
6530    let object_type = object_type.into();
6531    match (object_type, name) {
6532        (
6533            ObjectType::View
6534            | ObjectType::MaterializedView
6535            | ObjectType::Table
6536            | ObjectType::Source
6537            | ObjectType::Index
6538            | ObjectType::Sink
6539            | ObjectType::Secret
6540            | ObjectType::Connection,
6541            UnresolvedObjectName::Item(name),
6542        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6543        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6544            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6545        }
6546        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6547            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6548        }
6549        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6550            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6551        }
6552        (object_type, name) => {
6553            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6554        }
6555    }
6556}
6557
6558pub fn plan_alter_schema_rename(
6559    scx: &mut StatementContext,
6560    name: UnresolvedSchemaName,
6561    to_schema_name: Ident,
6562    if_exists: bool,
6563) -> Result<Plan, PlanError> {
6564    // Special case for mz_temp: with lazy temporary schema creation, the temp
6565    // schema may not exist yet, but we still need to return the correct error.
6566    // Check the schema name directly against MZ_TEMP_SCHEMA.
6567    let normalized = normalize::unresolved_schema_name(name.clone())?;
6568    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6569        sql_bail!(
6570            "cannot rename schemas in the ambient database: {:?}",
6571            mz_repr::namespaces::MZ_TEMP_SCHEMA
6572        );
6573    }
6574
6575    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6576        let object_type = ObjectType::Schema;
6577        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6578            name: name.to_ast_string_simple(),
6579            object_type,
6580        });
6581        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6582    };
6583
6584    // Make sure the name is unique.
6585    if scx
6586        .resolve_schema_in_database(&db_spec, &to_schema_name)
6587        .is_ok()
6588    {
6589        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6590            to_schema_name.clone().into_string(),
6591        )));
6592    }
6593
6594    // Prevent users from renaming system related schemas.
6595    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6596    if schema.id().is_system() {
6597        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6598    }
6599
6600    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6601        cur_schema_spec: (db_spec, schema_spec),
6602        new_schema_name: to_schema_name.into_string(),
6603    }))
6604}
6605
6606pub fn plan_alter_schema_swap<F>(
6607    scx: &mut StatementContext,
6608    name_a: UnresolvedSchemaName,
6609    name_b: Ident,
6610    gen_temp_suffix: F,
6611) -> Result<Plan, PlanError>
6612where
6613    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6614{
6615    // Special case for mz_temp: with lazy temporary schema creation, the temp
6616    // schema may not exist yet, but we still need to return the correct error.
6617    // Check the schema name directly against MZ_TEMP_SCHEMA.
6618    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6619    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6620    {
6621        sql_bail!("cannot swap schemas that are in the ambient database");
6622    }
6623    // Also check name_b (the target schema name)
6624    let name_b_str = normalize::ident_ref(&name_b);
6625    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6626        sql_bail!("cannot swap schemas that are in the ambient database");
6627    }
6628
6629    let schema_a = scx.resolve_schema(name_a.clone())?;
6630
6631    let db_spec = schema_a.database().clone();
6632    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6633        sql_bail!("cannot swap schemas that are in the ambient database");
6634    };
6635    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6636
6637    // We cannot swap system schemas.
6638    if schema_a.id().is_system() || schema_b.id().is_system() {
6639        bail_never_supported!("swapping a system schema".to_string())
6640    }
6641
6642    // Generate a temporary name we can swap schema_a to.
6643    //
6644    // 'check' returns if the temp schema name would be valid.
6645    let check = |temp_suffix: &str| {
6646        let mut temp_name = ident!("mz_schema_swap_");
6647        temp_name.append_lossy(temp_suffix);
6648        scx.resolve_schema_in_database(&db_spec, &temp_name)
6649            .is_err()
6650    };
6651    let temp_suffix = gen_temp_suffix(&check)?;
6652    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6653
6654    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6655        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6656        schema_a_name: schema_a.name().schema.to_string(),
6657        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6658        schema_b_name: schema_b.name().schema.to_string(),
6659        name_temp,
6660    }))
6661}
6662
6663pub fn plan_alter_item_rename(
6664    scx: &mut StatementContext,
6665    object_type: ObjectType,
6666    name: UnresolvedItemName,
6667    to_item_name: Ident,
6668    if_exists: bool,
6669) -> Result<Plan, PlanError> {
6670    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6671        Ok(r) => r,
6672        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6673        Err(PlanError::MismatchedObjectType {
6674            name,
6675            is_type: ObjectType::MaterializedView,
6676            expected_type: ObjectType::View,
6677        }) => {
6678            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6679        }
6680        e => e?,
6681    };
6682
6683    match resolved {
6684        Some(entry) => {
6685            let full_name = scx.catalog.resolve_full_name(entry.name());
6686            let item_type = entry.item_type();
6687
6688            let proposed_name = QualifiedItemName {
6689                qualifiers: entry.name().qualifiers.clone(),
6690                item: to_item_name.clone().into_string(),
6691            };
6692
6693            // For PostgreSQL compatibility, items and types cannot have
6694            // overlapping names in a variety of situations. See the comment on
6695            // `CatalogItemType::conflicts_with_type` for details.
6696            let conflicting_type_exists;
6697            let conflicting_item_exists;
6698            if item_type == CatalogItemType::Type {
6699                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6700                conflicting_item_exists = scx
6701                    .catalog
6702                    .get_item_by_name(&proposed_name)
6703                    .map(|item| item.item_type().conflicts_with_type())
6704                    .unwrap_or(false);
6705            } else {
6706                conflicting_type_exists = item_type.conflicts_with_type()
6707                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6708                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6709            };
6710            if conflicting_type_exists || conflicting_item_exists {
6711                sql_bail!("catalog item '{}' already exists", to_item_name);
6712            }
6713
6714            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6715                id: entry.id(),
6716                current_full_name: full_name,
6717                to_name: normalize::ident(to_item_name),
6718                object_type,
6719            }))
6720        }
6721        None => {
6722            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6723                name: name.to_ast_string_simple(),
6724                object_type,
6725            });
6726
6727            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6728        }
6729    }
6730}
6731
6732pub fn plan_alter_cluster_rename(
6733    scx: &mut StatementContext,
6734    object_type: ObjectType,
6735    name: Ident,
6736    to_name: Ident,
6737    if_exists: bool,
6738) -> Result<Plan, PlanError> {
6739    match resolve_cluster(scx, &name, if_exists)? {
6740        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6741            id: entry.id(),
6742            name: entry.name().to_string(),
6743            to_name: ident(to_name),
6744        })),
6745        None => {
6746            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6747                name: name.to_ast_string_simple(),
6748                object_type,
6749            });
6750
6751            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6752        }
6753    }
6754}
6755
6756pub fn plan_alter_cluster_swap<F>(
6757    scx: &mut StatementContext,
6758    name_a: Ident,
6759    name_b: Ident,
6760    gen_temp_suffix: F,
6761) -> Result<Plan, PlanError>
6762where
6763    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6764{
6765    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6766    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6767
6768    let check = |temp_suffix: &str| {
6769        let mut temp_name = ident!("mz_schema_swap_");
6770        temp_name.append_lossy(temp_suffix);
6771        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6772            // Temp name does not exist, so we can use it.
6773            Err(CatalogError::UnknownCluster(_)) => true,
6774            // Temp name already exists!
6775            Ok(_) | Err(_) => false,
6776        }
6777    };
6778    let temp_suffix = gen_temp_suffix(&check)?;
6779    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6780
6781    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6782        id_a: cluster_a.id(),
6783        id_b: cluster_b.id(),
6784        name_a: name_a.into_string(),
6785        name_b: name_b.into_string(),
6786        name_temp,
6787    }))
6788}
6789
6790pub fn plan_alter_cluster_replica_rename(
6791    scx: &mut StatementContext,
6792    object_type: ObjectType,
6793    name: QualifiedReplica,
6794    to_item_name: Ident,
6795    if_exists: bool,
6796) -> Result<Plan, PlanError> {
6797    match resolve_cluster_replica(scx, &name, if_exists)? {
6798        Some((cluster, replica)) => {
6799            ensure_cluster_is_not_managed(scx, cluster.id())?;
6800            Ok(Plan::AlterClusterReplicaRename(
6801                AlterClusterReplicaRenamePlan {
6802                    cluster_id: cluster.id(),
6803                    replica_id: replica,
6804                    name: QualifiedReplica {
6805                        cluster: Ident::new(cluster.name())?,
6806                        replica: name.replica,
6807                    },
6808                    to_name: normalize::ident(to_item_name),
6809                },
6810            ))
6811        }
6812        None => {
6813            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6814                name: name.to_ast_string_simple(),
6815                object_type,
6816            });
6817
6818            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6819        }
6820    }
6821}
6822
6823pub fn describe_alter_object_swap(
6824    _: &StatementContext,
6825    _: AlterObjectSwapStatement,
6826) -> Result<StatementDesc, PlanError> {
6827    Ok(StatementDesc::new(None))
6828}
6829
6830pub fn plan_alter_object_swap(
6831    scx: &mut StatementContext,
6832    stmt: AlterObjectSwapStatement,
6833) -> Result<Plan, PlanError> {
6834    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6835
6836    let AlterObjectSwapStatement {
6837        object_type,
6838        name_a,
6839        name_b,
6840    } = stmt;
6841    let object_type = object_type.into();
6842
6843    // We'll try 10 times to generate a temporary suffix.
6844    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6845        let mut attempts = 0;
6846        let name_temp = loop {
6847            attempts += 1;
6848            if attempts > 10 {
6849                tracing::warn!("Unable to generate temp id for swapping");
6850                sql_bail!("unable to swap!");
6851            }
6852
6853            // Call the provided closure to make sure this name is unique!
6854            let short_id = mz_ore::id_gen::temp_id();
6855            if check_fn(&short_id) {
6856                break short_id;
6857            }
6858        };
6859
6860        Ok(name_temp)
6861    };
6862
6863    match (object_type, name_a, name_b) {
6864        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6865            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6866        }
6867        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6868            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6869        }
6870        (object_type, _, _) => Err(PlanError::Unsupported {
6871            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6872            discussion_no: None,
6873        }),
6874    }
6875}
6876
6877pub fn describe_alter_retain_history(
6878    _: &StatementContext,
6879    _: AlterRetainHistoryStatement<Aug>,
6880) -> Result<StatementDesc, PlanError> {
6881    Ok(StatementDesc::new(None))
6882}
6883
6884pub fn plan_alter_retain_history(
6885    scx: &StatementContext,
6886    AlterRetainHistoryStatement {
6887        object_type,
6888        if_exists,
6889        name,
6890        history,
6891    }: AlterRetainHistoryStatement<Aug>,
6892) -> Result<Plan, PlanError> {
6893    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6894}
6895
6896fn alter_retain_history(
6897    scx: &StatementContext,
6898    object_type: ObjectType,
6899    if_exists: bool,
6900    name: UnresolvedObjectName,
6901    history: Option<WithOptionValue<Aug>>,
6902) -> Result<Plan, PlanError> {
6903    let name = match (object_type, name) {
6904        (
6905            // View gets a special error below.
6906            ObjectType::View
6907            | ObjectType::MaterializedView
6908            | ObjectType::Table
6909            | ObjectType::Source
6910            | ObjectType::Index,
6911            UnresolvedObjectName::Item(name),
6912        ) => name,
6913        (object_type, _) => {
6914            sql_bail!("{object_type} does not support RETAIN HISTORY")
6915        }
6916    };
6917    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6918        Some(entry) => {
6919            let full_name = scx.catalog.resolve_full_name(entry.name());
6920            let item_type = entry.item_type();
6921
6922            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6923            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6924                return Err(PlanError::AlterViewOnMaterializedView(
6925                    full_name.to_string(),
6926                ));
6927            } else if object_type == ObjectType::View {
6928                sql_bail!("{object_type} does not support RETAIN HISTORY")
6929            } else if object_type != item_type {
6930                sql_bail!(
6931                    "\"{}\" is a {} not a {}",
6932                    full_name,
6933                    entry.item_type(),
6934                    format!("{object_type}").to_lowercase()
6935                )
6936            }
6937
6938            // Save the original value so we can write it back down in the create_sql catalog item.
6939            let (value, lcw) = match &history {
6940                Some(WithOptionValue::RetainHistoryFor(value)) => {
6941                    let window = OptionalDuration::try_from_value(value.clone())?;
6942                    (Some(value.clone()), window.0)
6943                }
6944                // None is RESET, so use the default CW.
6945                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6946                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6947            };
6948            let window = plan_retain_history(scx, lcw)?;
6949
6950            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6951                id: entry.id(),
6952                value,
6953                window,
6954                object_type,
6955            }))
6956        }
6957        None => {
6958            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6959                name: name.to_ast_string_simple(),
6960                object_type,
6961            });
6962
6963            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6964        }
6965    }
6966}
6967
6968pub fn describe_alter_secret_options(
6969    _: &StatementContext,
6970    _: AlterSecretStatement<Aug>,
6971) -> Result<StatementDesc, PlanError> {
6972    Ok(StatementDesc::new(None))
6973}
6974
6975pub fn plan_alter_secret(
6976    scx: &mut StatementContext,
6977    stmt: AlterSecretStatement<Aug>,
6978) -> Result<Plan, PlanError> {
6979    let AlterSecretStatement {
6980        name,
6981        if_exists,
6982        value,
6983    } = stmt;
6984    let object_type = ObjectType::Secret;
6985    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6986        Some(entry) => entry.id(),
6987        None => {
6988            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6989                name: name.to_string(),
6990                object_type,
6991            });
6992
6993            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6994        }
6995    };
6996
6997    let secret_as = query::plan_secret_as(scx, value)?;
6998
6999    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7000}
7001
7002pub fn describe_alter_connection(
7003    _: &StatementContext,
7004    _: AlterConnectionStatement<Aug>,
7005) -> Result<StatementDesc, PlanError> {
7006    Ok(StatementDesc::new(None))
7007}
7008
7009generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7010
7011pub fn plan_alter_connection(
7012    scx: &StatementContext,
7013    stmt: AlterConnectionStatement<Aug>,
7014) -> Result<Plan, PlanError> {
7015    let AlterConnectionStatement {
7016        name,
7017        if_exists,
7018        actions,
7019        with_options,
7020    } = stmt;
7021    let conn_name = normalize::unresolved_item_name(name)?;
7022    let entry = match scx.catalog.resolve_item(&conn_name) {
7023        Ok(entry) => entry,
7024        Err(_) if if_exists => {
7025            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7026                name: conn_name.to_string(),
7027                object_type: ObjectType::Sink,
7028            });
7029
7030            return Ok(Plan::AlterNoop(AlterNoopPlan {
7031                object_type: ObjectType::Connection,
7032            }));
7033        }
7034        Err(e) => return Err(e.into()),
7035    };
7036
7037    let connection = entry.connection()?;
7038
7039    if actions
7040        .iter()
7041        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7042    {
7043        if actions.len() > 1 {
7044            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7045        }
7046
7047        if !with_options.is_empty() {
7048            sql_bail!(
7049                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7050                with_options
7051                    .iter()
7052                    .map(|o| o.to_ast_string_simple())
7053                    .join(", ")
7054            );
7055        }
7056
7057        if !matches!(connection, Connection::Ssh(_)) {
7058            sql_bail!(
7059                "{} is not an SSH connection",
7060                scx.catalog.resolve_full_name(entry.name())
7061            )
7062        }
7063
7064        return Ok(Plan::AlterConnection(AlterConnectionPlan {
7065            id: entry.id(),
7066            action: crate::plan::AlterConnectionAction::RotateKeys,
7067        }));
7068    }
7069
7070    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7071    if options.validate.is_some() {
7072        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7073    }
7074
7075    let validate = match options.validate {
7076        Some(val) => val,
7077        None => {
7078            scx.catalog
7079                .system_vars()
7080                .enable_default_connection_validation()
7081                && connection.validate_by_default()
7082        }
7083    };
7084
7085    let connection_type = match connection {
7086        Connection::Aws(_) => CreateConnectionType::Aws,
7087        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7088        Connection::Kafka(_) => CreateConnectionType::Kafka,
7089        Connection::Csr(_) => CreateConnectionType::Csr,
7090        Connection::Postgres(_) => CreateConnectionType::Postgres,
7091        Connection::Ssh(_) => CreateConnectionType::Ssh,
7092        Connection::MySql(_) => CreateConnectionType::MySql,
7093        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7094        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7095    };
7096
7097    // Collect all options irrespective of action taken on them.
7098    let specified_options: BTreeSet<_> = actions
7099        .iter()
7100        .map(|action: &AlterConnectionAction<Aug>| match action {
7101            AlterConnectionAction::SetOption(option) => option.name.clone(),
7102            AlterConnectionAction::DropOption(name) => name.clone(),
7103            AlterConnectionAction::RotateKeys => unreachable!(),
7104        })
7105        .collect();
7106
7107    for invalid in INALTERABLE_OPTIONS {
7108        if specified_options.contains(invalid) {
7109            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7110        }
7111    }
7112
7113    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7114
7115    // Partition operations into set and drop
7116    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7117        actions.into_iter().partition_map(|action| match action {
7118            AlterConnectionAction::SetOption(option) => Either::Left(option),
7119            AlterConnectionAction::DropOption(name) => Either::Right(name),
7120            AlterConnectionAction::RotateKeys => unreachable!(),
7121        });
7122
7123    let set_options: BTreeMap<_, _> = set_options_vec
7124        .clone()
7125        .into_iter()
7126        .map(|option| (option.name, option.value))
7127        .collect();
7128
7129    // Type check values + avoid duplicates; we don't want to e.g. let users
7130    // drop and set the same option in the same statement, so treating drops as
7131    // sets here is fine.
7132    let connection_options_extracted =
7133        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7134
7135    let duplicates: Vec<_> = connection_options_extracted
7136        .seen
7137        .intersection(&drop_options)
7138        .collect();
7139
7140    if !duplicates.is_empty() {
7141        sql_bail!(
7142            "cannot both SET and DROP/RESET options {}",
7143            duplicates
7144                .iter()
7145                .map(|option| option.to_string())
7146                .join(", ")
7147        )
7148    }
7149
7150    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7151        let set_options_count = mutually_exclusive_options
7152            .iter()
7153            .filter(|o| set_options.contains_key(o))
7154            .count();
7155        let drop_options_count = mutually_exclusive_options
7156            .iter()
7157            .filter(|o| drop_options.contains(o))
7158            .count();
7159
7160        // Disallow setting _and_ resetting mutually exclusive options
7161        if set_options_count > 0 && drop_options_count > 0 {
7162            sql_bail!(
7163                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7164                connection_type,
7165                mutually_exclusive_options
7166                    .iter()
7167                    .map(|option| option.to_string())
7168                    .join(", ")
7169            )
7170        }
7171
7172        // If any option is either set or dropped, ensure all mutually exclusive
7173        // options are dropped. We do this "behind the scenes", even though we
7174        // disallow users from performing the same action because this is the
7175        // mechanism by which we overwrite values elsewhere in the code.
7176        if set_options_count > 0 || drop_options_count > 0 {
7177            drop_options.extend(mutually_exclusive_options.iter().cloned());
7178        }
7179
7180        // n.b. if mutually exclusive options are set, those will error when we
7181        // try to replan the connection.
7182    }
7183
7184    Ok(Plan::AlterConnection(AlterConnectionPlan {
7185        id: entry.id(),
7186        action: crate::plan::AlterConnectionAction::AlterOptions {
7187            set_options,
7188            drop_options,
7189            validate,
7190        },
7191    }))
7192}
7193
7194pub fn describe_alter_sink(
7195    _: &StatementContext,
7196    _: AlterSinkStatement<Aug>,
7197) -> Result<StatementDesc, PlanError> {
7198    Ok(StatementDesc::new(None))
7199}
7200
7201pub fn plan_alter_sink(
7202    scx: &mut StatementContext,
7203    stmt: AlterSinkStatement<Aug>,
7204) -> Result<Plan, PlanError> {
7205    let AlterSinkStatement {
7206        sink_name,
7207        if_exists,
7208        action,
7209    } = stmt;
7210
7211    let object_type = ObjectType::Sink;
7212    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7213
7214    let Some(item) = item else {
7215        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7216            name: sink_name.to_string(),
7217            object_type,
7218        });
7219
7220        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7221    };
7222    // Always ALTER objects from their latest version.
7223    let item = item.at_version(RelationVersionSelector::Latest);
7224
7225    match action {
7226        AlterSinkAction::ChangeRelation(new_from) => {
7227            // First we reconstruct the original CREATE SINK statement
7228            let create_sql = item.create_sql();
7229            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7230            let [stmt]: [StatementParseResult; 1] = stmts
7231                .try_into()
7232                .expect("create sql of sink was not exactly one statement");
7233            let Statement::CreateSink(stmt) = stmt.ast else {
7234                unreachable!("invalid create SQL for sink item");
7235            };
7236
7237            // Then resolve and swap the resolved from relation to the new one
7238            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7239            stmt.from = new_from;
7240
7241            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7242            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7243                unreachable!("invalid plan for CREATE SINK statement");
7244            };
7245
7246            plan.sink.version += 1;
7247
7248            Ok(Plan::AlterSink(AlterSinkPlan {
7249                item_id: item.id(),
7250                global_id: item.global_id(),
7251                sink: plan.sink,
7252                with_snapshot: plan.with_snapshot,
7253                in_cluster: plan.in_cluster,
7254            }))
7255        }
7256        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7257        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7258    }
7259}
7260
7261pub fn describe_alter_source(
7262    _: &StatementContext,
7263    _: AlterSourceStatement<Aug>,
7264) -> Result<StatementDesc, PlanError> {
7265    // TODO: put the options here, right?
7266    Ok(StatementDesc::new(None))
7267}
7268
7269generate_extracted_config!(
7270    AlterSourceAddSubsourceOption,
7271    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7272    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7273    (Details, String)
7274);
7275
7276pub fn plan_alter_source(
7277    scx: &mut StatementContext,
7278    stmt: AlterSourceStatement<Aug>,
7279) -> Result<Plan, PlanError> {
7280    let AlterSourceStatement {
7281        source_name,
7282        if_exists,
7283        action,
7284    } = stmt;
7285    let object_type = ObjectType::Source;
7286
7287    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7288        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7289            name: source_name.to_string(),
7290            object_type,
7291        });
7292
7293        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7294    }
7295
7296    match action {
7297        AlterSourceAction::SetOptions(options) => {
7298            let mut options = options.into_iter();
7299            let option = options.next().unwrap();
7300            if option.name == CreateSourceOptionName::RetainHistory {
7301                if options.next().is_some() {
7302                    sql_bail!("RETAIN HISTORY must be only option");
7303                }
7304                return alter_retain_history(
7305                    scx,
7306                    object_type,
7307                    if_exists,
7308                    UnresolvedObjectName::Item(source_name),
7309                    option.value,
7310                );
7311            }
7312            // n.b we use this statement in purification in a way that cannot be
7313            // planned directly.
7314            sql_bail!(
7315                "Cannot modify the {} of a SOURCE.",
7316                option.name.to_ast_string_simple()
7317            );
7318        }
7319        AlterSourceAction::ResetOptions(reset) => {
7320            let mut options = reset.into_iter();
7321            let option = options.next().unwrap();
7322            if option == CreateSourceOptionName::RetainHistory {
7323                if options.next().is_some() {
7324                    sql_bail!("RETAIN HISTORY must be only option");
7325                }
7326                return alter_retain_history(
7327                    scx,
7328                    object_type,
7329                    if_exists,
7330                    UnresolvedObjectName::Item(source_name),
7331                    None,
7332                );
7333            }
7334            sql_bail!(
7335                "Cannot modify the {} of a SOURCE.",
7336                option.to_ast_string_simple()
7337            );
7338        }
7339        AlterSourceAction::DropSubsources { .. } => {
7340            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7341        }
7342        AlterSourceAction::AddSubsources { .. } => {
7343            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7344        }
7345        AlterSourceAction::RefreshReferences => {
7346            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7347        }
7348    };
7349}
7350
7351pub fn describe_alter_system_set(
7352    _: &StatementContext,
7353    _: AlterSystemSetStatement,
7354) -> Result<StatementDesc, PlanError> {
7355    Ok(StatementDesc::new(None))
7356}
7357
7358pub fn plan_alter_system_set(
7359    _: &StatementContext,
7360    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7361) -> Result<Plan, PlanError> {
7362    let name = name.to_string();
7363    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7364        name,
7365        value: scl::plan_set_variable_to(to)?,
7366    }))
7367}
7368
7369pub fn describe_alter_system_reset(
7370    _: &StatementContext,
7371    _: AlterSystemResetStatement,
7372) -> Result<StatementDesc, PlanError> {
7373    Ok(StatementDesc::new(None))
7374}
7375
7376pub fn plan_alter_system_reset(
7377    _: &StatementContext,
7378    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7379) -> Result<Plan, PlanError> {
7380    let name = name.to_string();
7381    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7382}
7383
7384pub fn describe_alter_system_reset_all(
7385    _: &StatementContext,
7386    _: AlterSystemResetAllStatement,
7387) -> Result<StatementDesc, PlanError> {
7388    Ok(StatementDesc::new(None))
7389}
7390
7391pub fn plan_alter_system_reset_all(
7392    _: &StatementContext,
7393    _: AlterSystemResetAllStatement,
7394) -> Result<Plan, PlanError> {
7395    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7396}
7397
7398pub fn describe_alter_role(
7399    _: &StatementContext,
7400    _: AlterRoleStatement<Aug>,
7401) -> Result<StatementDesc, PlanError> {
7402    Ok(StatementDesc::new(None))
7403}
7404
7405pub fn plan_alter_role(
7406    scx: &StatementContext,
7407    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7408) -> Result<Plan, PlanError> {
7409    let option = match option {
7410        AlterRoleOption::Attributes(attrs) => {
7411            let attrs = plan_role_attributes(attrs, scx)?;
7412            PlannedAlterRoleOption::Attributes(attrs)
7413        }
7414        AlterRoleOption::Variable(variable) => {
7415            let var = plan_role_variable(variable)?;
7416            PlannedAlterRoleOption::Variable(var)
7417        }
7418    };
7419
7420    Ok(Plan::AlterRole(AlterRolePlan {
7421        id: name.id,
7422        name: name.name,
7423        option,
7424    }))
7425}
7426
7427pub fn describe_alter_table_add_column(
7428    _: &StatementContext,
7429    _: AlterTableAddColumnStatement<Aug>,
7430) -> Result<StatementDesc, PlanError> {
7431    Ok(StatementDesc::new(None))
7432}
7433
7434pub fn plan_alter_table_add_column(
7435    scx: &StatementContext,
7436    stmt: AlterTableAddColumnStatement<Aug>,
7437) -> Result<Plan, PlanError> {
7438    let AlterTableAddColumnStatement {
7439        if_exists,
7440        name,
7441        if_col_not_exist,
7442        column_name,
7443        data_type,
7444    } = stmt;
7445    let object_type = ObjectType::Table;
7446
7447    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7448
7449    let (relation_id, item_name, desc) =
7450        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7451            Some(item) => {
7452                // Always add columns to the latest version of the item.
7453                let item_name = scx.catalog.resolve_full_name(item.name());
7454                let item = item.at_version(RelationVersionSelector::Latest);
7455                let desc = item.relation_desc().expect("table has desc").into_owned();
7456                (item.id(), item_name, desc)
7457            }
7458            None => {
7459                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7460                    name: name.to_ast_string_simple(),
7461                    object_type,
7462                });
7463                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7464            }
7465        };
7466
7467    let column_name = ColumnName::from(column_name.as_str());
7468    if desc.get_by_name(&column_name).is_some() {
7469        if if_col_not_exist {
7470            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7471                column_name: column_name.to_string(),
7472                object_name: item_name.item,
7473            });
7474            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7475        } else {
7476            return Err(PlanError::ColumnAlreadyExists {
7477                column_name,
7478                object_name: item_name.item,
7479            });
7480        }
7481    }
7482
7483    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7484    // TODO(alter_table): Support non-nullable columns with default values.
7485    let column_type = scalar_type.nullable(true);
7486    // "unresolve" our data type so we can later update the persisted create_sql.
7487    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7488
7489    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7490        relation_id,
7491        column_name,
7492        column_type,
7493        raw_sql_type,
7494    }))
7495}
7496
7497pub fn describe_alter_materialized_view_apply_replacement(
7498    _: &StatementContext,
7499    _: AlterMaterializedViewApplyReplacementStatement,
7500) -> Result<StatementDesc, PlanError> {
7501    Ok(StatementDesc::new(None))
7502}
7503
7504pub fn plan_alter_materialized_view_apply_replacement(
7505    scx: &StatementContext,
7506    stmt: AlterMaterializedViewApplyReplacementStatement,
7507) -> Result<Plan, PlanError> {
7508    let AlterMaterializedViewApplyReplacementStatement {
7509        if_exists,
7510        name,
7511        replacement_name,
7512    } = stmt;
7513
7514    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7515
7516    let object_type = ObjectType::MaterializedView;
7517    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7518        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7519            name: name.to_ast_string_simple(),
7520            object_type,
7521        });
7522        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7523    };
7524
7525    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7526        .expect("if_exists not set");
7527
7528    if replacement.replacement_target() != Some(mv.id()) {
7529        return Err(PlanError::InvalidReplacement {
7530            item_type: mv.item_type(),
7531            item_name: scx.catalog.minimal_qualification(mv.name()),
7532            replacement_type: replacement.item_type(),
7533            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7534        });
7535    }
7536
7537    Ok(Plan::AlterMaterializedViewApplyReplacement(
7538        AlterMaterializedViewApplyReplacementPlan {
7539            id: mv.id(),
7540            replacement_id: replacement.id(),
7541        },
7542    ))
7543}
7544
7545pub fn describe_comment(
7546    _: &StatementContext,
7547    _: CommentStatement<Aug>,
7548) -> Result<StatementDesc, PlanError> {
7549    Ok(StatementDesc::new(None))
7550}
7551
7552pub fn plan_comment(
7553    scx: &mut StatementContext,
7554    stmt: CommentStatement<Aug>,
7555) -> Result<Plan, PlanError> {
7556    const MAX_COMMENT_LENGTH: usize = 1024;
7557
7558    let CommentStatement { object, comment } = stmt;
7559
7560    // TODO(parkmycar): Make max comment length configurable.
7561    if let Some(c) = &comment {
7562        if c.len() > 1024 {
7563            return Err(PlanError::CommentTooLong {
7564                length: c.len(),
7565                max_size: MAX_COMMENT_LENGTH,
7566            });
7567        }
7568    }
7569
7570    let (object_id, column_pos) = match &object {
7571        com_ty @ CommentObjectType::Table { name }
7572        | com_ty @ CommentObjectType::View { name }
7573        | com_ty @ CommentObjectType::MaterializedView { name }
7574        | com_ty @ CommentObjectType::Index { name }
7575        | com_ty @ CommentObjectType::Func { name }
7576        | com_ty @ CommentObjectType::Connection { name }
7577        | com_ty @ CommentObjectType::Source { name }
7578        | com_ty @ CommentObjectType::Sink { name }
7579        | com_ty @ CommentObjectType::Secret { name }
7580        | com_ty @ CommentObjectType::ContinualTask { name } => {
7581            let item = scx.get_item_by_resolved_name(name)?;
7582            match (com_ty, item.item_type()) {
7583                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7584                    (CommentObjectId::Table(item.id()), None)
7585                }
7586                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7587                    (CommentObjectId::View(item.id()), None)
7588                }
7589                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7590                    (CommentObjectId::MaterializedView(item.id()), None)
7591                }
7592                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7593                    (CommentObjectId::Index(item.id()), None)
7594                }
7595                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7596                    (CommentObjectId::Func(item.id()), None)
7597                }
7598                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7599                    (CommentObjectId::Connection(item.id()), None)
7600                }
7601                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7602                    (CommentObjectId::Source(item.id()), None)
7603                }
7604                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7605                    (CommentObjectId::Sink(item.id()), None)
7606                }
7607                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7608                    (CommentObjectId::Secret(item.id()), None)
7609                }
7610                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7611                    (CommentObjectId::ContinualTask(item.id()), None)
7612                }
7613                (com_ty, cat_ty) => {
7614                    let expected_type = match com_ty {
7615                        CommentObjectType::Table { .. } => ObjectType::Table,
7616                        CommentObjectType::View { .. } => ObjectType::View,
7617                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7618                        CommentObjectType::Index { .. } => ObjectType::Index,
7619                        CommentObjectType::Func { .. } => ObjectType::Func,
7620                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7621                        CommentObjectType::Source { .. } => ObjectType::Source,
7622                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7623                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7624                        _ => unreachable!("these are the only types we match on"),
7625                    };
7626
7627                    return Err(PlanError::InvalidObjectType {
7628                        expected_type: SystemObjectType::Object(expected_type),
7629                        actual_type: SystemObjectType::Object(cat_ty.into()),
7630                        object_name: item.name().item.clone(),
7631                    });
7632                }
7633            }
7634        }
7635        CommentObjectType::Type { ty } => match ty {
7636            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7637                sql_bail!("cannot comment on anonymous list or map type");
7638            }
7639            ResolvedDataType::Named { id, modifiers, .. } => {
7640                if !modifiers.is_empty() {
7641                    sql_bail!("cannot comment on type with modifiers");
7642                }
7643                (CommentObjectId::Type(*id), None)
7644            }
7645            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7646        },
7647        CommentObjectType::Column { name } => {
7648            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7649            match item.item_type() {
7650                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7651                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7652                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7653                CatalogItemType::MaterializedView => {
7654                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7655                }
7656                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7657                r => {
7658                    return Err(PlanError::Unsupported {
7659                        feature: format!("Specifying comments on a column of {r}"),
7660                        discussion_no: None,
7661                    });
7662                }
7663            }
7664        }
7665        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7666        CommentObjectType::Database { name } => {
7667            (CommentObjectId::Database(*name.database_id()), None)
7668        }
7669        CommentObjectType::Schema { name } => {
7670            // Temporary schemas cannot have comments - they are connection-specific
7671            // and transient. With lazy temporary schema creation, the temp schema
7672            // may not exist yet, but we still need to return the correct error.
7673            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7674                sql_bail!(
7675                    "cannot comment on schema {} because it is a temporary schema",
7676                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7677                );
7678            }
7679            (
7680                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7681                None,
7682            )
7683        }
7684        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7685        CommentObjectType::ClusterReplica { name } => {
7686            let replica = scx.catalog.resolve_cluster_replica(name)?;
7687            (
7688                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7689                None,
7690            )
7691        }
7692        CommentObjectType::NetworkPolicy { name } => {
7693            (CommentObjectId::NetworkPolicy(name.id), None)
7694        }
7695    };
7696
7697    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7698    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7699    // it's the easiest place to raise an error.
7700    //
7701    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7702    if let Some(p) = column_pos {
7703        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7704            max_num_columns: MAX_NUM_COLUMNS,
7705            req_num_columns: p,
7706        })?;
7707    }
7708
7709    Ok(Plan::Comment(CommentPlan {
7710        object_id,
7711        sub_component: column_pos,
7712        comment,
7713    }))
7714}
7715
7716pub(crate) fn resolve_cluster<'a>(
7717    scx: &'a StatementContext,
7718    name: &'a Ident,
7719    if_exists: bool,
7720) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7721    match scx.resolve_cluster(Some(name)) {
7722        Ok(cluster) => Ok(Some(cluster)),
7723        Err(_) if if_exists => Ok(None),
7724        Err(e) => Err(e),
7725    }
7726}
7727
7728pub(crate) fn resolve_cluster_replica<'a>(
7729    scx: &'a StatementContext,
7730    name: &QualifiedReplica,
7731    if_exists: bool,
7732) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7733    match scx.resolve_cluster(Some(&name.cluster)) {
7734        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7735            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7736            None if if_exists => Ok(None),
7737            None => Err(sql_err!(
7738                "CLUSTER {} has no CLUSTER REPLICA named {}",
7739                cluster.name(),
7740                name.replica.as_str().quoted(),
7741            )),
7742        },
7743        Err(_) if if_exists => Ok(None),
7744        Err(e) => Err(e),
7745    }
7746}
7747
7748pub(crate) fn resolve_database<'a>(
7749    scx: &'a StatementContext,
7750    name: &'a UnresolvedDatabaseName,
7751    if_exists: bool,
7752) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7753    match scx.resolve_database(name) {
7754        Ok(database) => Ok(Some(database)),
7755        Err(_) if if_exists => Ok(None),
7756        Err(e) => Err(e),
7757    }
7758}
7759
7760pub(crate) fn resolve_schema<'a>(
7761    scx: &'a StatementContext,
7762    name: UnresolvedSchemaName,
7763    if_exists: bool,
7764) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7765    match scx.resolve_schema(name) {
7766        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7767        Err(_) if if_exists => Ok(None),
7768        Err(e) => Err(e),
7769    }
7770}
7771
7772pub(crate) fn resolve_network_policy<'a>(
7773    scx: &'a StatementContext,
7774    name: Ident,
7775    if_exists: bool,
7776) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7777    match scx.catalog.resolve_network_policy(&name.to_string()) {
7778        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7779            id: policy.id(),
7780            name: policy.name().to_string(),
7781        })),
7782        Err(_) if if_exists => Ok(None),
7783        Err(e) => Err(e.into()),
7784    }
7785}
7786
7787pub(crate) fn resolve_item_or_type<'a>(
7788    scx: &'a StatementContext,
7789    object_type: ObjectType,
7790    name: UnresolvedItemName,
7791    if_exists: bool,
7792) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7793    let name = normalize::unresolved_item_name(name)?;
7794    let catalog_item = match object_type {
7795        ObjectType::Type => scx.catalog.resolve_type(&name),
7796        _ => scx.catalog.resolve_item(&name),
7797    };
7798
7799    match catalog_item {
7800        Ok(item) => {
7801            let is_type = ObjectType::from(item.item_type());
7802            if object_type == is_type {
7803                Ok(Some(item))
7804            } else {
7805                Err(PlanError::MismatchedObjectType {
7806                    name: scx.catalog.minimal_qualification(item.name()),
7807                    is_type,
7808                    expected_type: object_type,
7809                })
7810            }
7811        }
7812        Err(_) if if_exists => Ok(None),
7813        Err(e) => Err(e.into()),
7814    }
7815}
7816
7817/// Returns an error if the given cluster is a managed cluster
7818fn ensure_cluster_is_not_managed(
7819    scx: &StatementContext,
7820    cluster_id: ClusterId,
7821) -> Result<(), PlanError> {
7822    let cluster = scx.catalog.get_cluster(cluster_id);
7823    if cluster.is_managed() {
7824        Err(PlanError::ManagedCluster {
7825            cluster_name: cluster.name().to_string(),
7826        })
7827    } else {
7828        Ok(())
7829    }
7830}