Skip to main content

mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_arrow_util::builder::ArrowBuilder;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
59    CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
60    CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
61    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
62    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
63    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
64    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
65    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
66    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
67    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
68    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
69    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
70    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
71    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
72    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
73    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
74    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
75    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
76    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
77    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
78    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
79    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
80    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
81    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
82    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
90    SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
91};
92use mz_storage_types::sources::encoding::{
93    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94    SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110    PostgresSourceConnection, PostgresSourcePublicationDetails,
111    ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
120    SqlServerSourceExtras, Timeline,
121};
122use prost::Message;
123
124use crate::ast::display::AstDisplay;
125use crate::catalog::{
126    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
127    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
128};
129use crate::iceberg::IcebergSinkConfigOptionExtracted;
130use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
131use crate::names::{
132    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
133    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
134    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
135};
136use crate::normalize::{self, ident};
137use crate::plan::error::PlanError;
138use crate::plan::query::{
139    ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
140};
141use crate::plan::scope::Scope;
142use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
143use crate::plan::statement::{StatementContext, StatementDesc, scl};
144use crate::plan::typeconv::CastContext;
145use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
146use crate::plan::{
147    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
148    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
149    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
150    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
151    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
152    AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
153    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
154    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
155    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
156    CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
157    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
158    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
159    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
160    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
161    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
162    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
163    WebhookValidation, literal, plan_utils, query, transform_ast,
164};
165use crate::session::vars::{
166    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
167    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
168    ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
169};
170use crate::{names, parse};
171
172mod connection;
173
174// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
175//
176// The real max is probably higher than this, but it's easier to relax a constraint than make it
177// more strict.
178const MAX_NUM_COLUMNS: usize = 256;
179
180static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
181    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
182
183/// Given a relation desc and a column list, checks that:
184/// - the column list is a prefix of the desc;
185/// - all the listed columns are types that have meaningful Persist-level ordering.
186fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
187    if partition_by.len() > desc.len() {
188        tracing::error!(
189            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
190            desc.len(),
191            partition_by.len()
192        );
193        partition_by.truncate(desc.len());
194    }
195
196    let desc_prefix = desc.iter().take(partition_by.len());
197    for (idx, ((desc_name, desc_type), partition_name)) in
198        desc_prefix.zip_eq(partition_by).enumerate()
199    {
200        let partition_name = normalize::column_name(partition_name);
201        if *desc_name != partition_name {
202            sql_bail!(
203                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
204            );
205        }
206        if !preserves_order(&desc_type.scalar_type) {
207            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
208        }
209    }
210    Ok(())
211}
212
213pub fn describe_create_database(
214    _: &StatementContext,
215    _: CreateDatabaseStatement,
216) -> Result<StatementDesc, PlanError> {
217    Ok(StatementDesc::new(None))
218}
219
220pub fn plan_create_database(
221    _: &StatementContext,
222    CreateDatabaseStatement {
223        name,
224        if_not_exists,
225    }: CreateDatabaseStatement,
226) -> Result<Plan, PlanError> {
227    Ok(Plan::CreateDatabase(CreateDatabasePlan {
228        name: normalize::ident(name.0),
229        if_not_exists,
230    }))
231}
232
233pub fn describe_create_schema(
234    _: &StatementContext,
235    _: CreateSchemaStatement,
236) -> Result<StatementDesc, PlanError> {
237    Ok(StatementDesc::new(None))
238}
239
240pub fn plan_create_schema(
241    scx: &StatementContext,
242    CreateSchemaStatement {
243        mut name,
244        if_not_exists,
245    }: CreateSchemaStatement,
246) -> Result<Plan, PlanError> {
247    if name.0.len() > 2 {
248        sql_bail!("schema name {} has more than two components", name);
249    }
250    let schema_name = normalize::ident(
251        name.0
252            .pop()
253            .expect("names always have at least one component"),
254    );
255    let database_spec = match name.0.pop() {
256        None => match scx.catalog.active_database() {
257            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
258            None => sql_bail!("no database specified and no active database"),
259        },
260        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
261            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
262            Err(_) => sql_bail!("invalid database {}", n.as_str()),
263        },
264    };
265    Ok(Plan::CreateSchema(CreateSchemaPlan {
266        database_spec,
267        schema_name,
268        if_not_exists,
269    }))
270}
271
272pub fn describe_create_table(
273    _: &StatementContext,
274    _: CreateTableStatement<Aug>,
275) -> Result<StatementDesc, PlanError> {
276    Ok(StatementDesc::new(None))
277}
278
279pub fn plan_create_table(
280    scx: &StatementContext,
281    stmt: CreateTableStatement<Aug>,
282) -> Result<Plan, PlanError> {
283    let CreateTableStatement {
284        name,
285        columns,
286        constraints,
287        if_not_exists,
288        temporary,
289        with_options,
290    } = &stmt;
291
292    let names: Vec<_> = columns
293        .iter()
294        .filter(|c| {
295            // This set of `names` is used to create the initial RelationDesc.
296            // Columns that have been added at later versions of the table will
297            // get added further below.
298            let is_versioned = c
299                .options
300                .iter()
301                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
302            !is_versioned
303        })
304        .map(|c| normalize::column_name(c.name.clone()))
305        .collect();
306
307    if let Some(dup) = names.iter().duplicates().next() {
308        sql_bail!("column {} specified more than once", dup.quoted());
309    }
310
311    // Build initial relation type that handles declared data types
312    // and NOT NULL constraints.
313    let mut column_types = Vec::with_capacity(columns.len());
314    let mut defaults = Vec::with_capacity(columns.len());
315    let mut changes = BTreeMap::new();
316    let mut keys = Vec::new();
317
318    for (i, c) in columns.into_iter().enumerate() {
319        let aug_data_type = &c.data_type;
320        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
321        let mut nullable = true;
322        let mut default = Expr::null();
323        let mut versioned = false;
324        for option in &c.options {
325            match &option.option {
326                ColumnOption::NotNull => nullable = false,
327                ColumnOption::Default(expr) => {
328                    // Ensure expression can be planned and yields the correct
329                    // type.
330                    let mut expr = expr.clone();
331                    transform_ast::transform(scx, &mut expr)?;
332                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
333                    default = expr.clone();
334                }
335                ColumnOption::Unique { is_primary } => {
336                    keys.push(vec![i]);
337                    if *is_primary {
338                        nullable = false;
339                    }
340                }
341                ColumnOption::Versioned { action, version } => {
342                    let version = RelationVersion::from(*version);
343                    versioned = true;
344
345                    let name = normalize::column_name(c.name.clone());
346                    let typ = ty.clone().nullable(nullable);
347
348                    changes.insert(version, (action.clone(), name, typ));
349                }
350                other => {
351                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
352                }
353            }
354        }
355        // TODO(alter_table): This assumes all versioned columns are at the
356        // end. This will no longer be true when we support dropping columns.
357        if !versioned {
358            column_types.push(ty.nullable(nullable));
359        }
360        defaults.push(default);
361    }
362
363    let mut seen_primary = false;
364    'c: for constraint in constraints {
365        match constraint {
366            TableConstraint::Unique {
367                name: _,
368                columns,
369                is_primary,
370                nulls_not_distinct,
371            } => {
372                if seen_primary && *is_primary {
373                    sql_bail!(
374                        "multiple primary keys for table {} are not allowed",
375                        name.to_ast_string_stable()
376                    );
377                }
378                seen_primary = *is_primary || seen_primary;
379
380                let mut key = vec![];
381                for column in columns {
382                    let column = normalize::column_name(column.clone());
383                    match names.iter().position(|name| *name == column) {
384                        None => sql_bail!("unknown column in constraint: {}", column),
385                        Some(i) => {
386                            let nullable = &mut column_types[i].nullable;
387                            if *is_primary {
388                                if *nulls_not_distinct {
389                                    sql_bail!(
390                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
391                                    );
392                                }
393
394                                *nullable = false;
395                            } else if !(*nulls_not_distinct || !*nullable) {
396                                // Non-primary key unique constraints are only keys if all of their
397                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
398                                break 'c;
399                            }
400
401                            key.push(i);
402                        }
403                    }
404                }
405
406                if *is_primary {
407                    keys.insert(0, key);
408                } else {
409                    keys.push(key);
410                }
411            }
412            TableConstraint::ForeignKey { .. } => {
413                // Foreign key constraints are not presently enforced. We allow
414                // them with feature flags for sqllogictest's sake.
415                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
416            }
417            TableConstraint::Check { .. } => {
418                // Check constraints are not presently enforced. We allow them
419                // with feature flags for sqllogictest's sake.
420                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
421            }
422        }
423    }
424
425    if !keys.is_empty() {
426        // Unique constraints are not presently enforced. We allow them with feature flags for
427        // sqllogictest's sake.
428        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
429    }
430
431    let typ = SqlRelationType::new(column_types).with_keys(keys);
432
433    let temporary = *temporary;
434    let name = if temporary {
435        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
436    } else {
437        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
438    };
439
440    // Check for an object in the catalog with this same name
441    let full_name = scx.catalog.resolve_full_name(&name);
442    let partial_name = PartialItemName::from(full_name.clone());
443    // For PostgreSQL compatibility, we need to prevent creating tables when
444    // there is an existing object *or* type of the same name.
445    if let (false, Ok(item)) = (
446        if_not_exists,
447        scx.catalog.resolve_item_or_type(&partial_name),
448    ) {
449        return Err(PlanError::ItemAlreadyExists {
450            name: full_name.to_string(),
451            item_type: item.item_type(),
452        });
453    }
454
455    let desc = RelationDesc::new(typ, names);
456    let mut desc = VersionedRelationDesc::new(desc);
457    for (version, (_action, name, typ)) in changes.into_iter() {
458        let new_version = desc.add_column(name, typ);
459        if version != new_version {
460            return Err(PlanError::InvalidTable {
461                name: full_name.item,
462            });
463        }
464    }
465
466    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
467
468    // Table options should only consider the original columns, since those
469    // were the only ones in scope when the table was created.
470    //
471    // TODO(alter_table): Will need to reconsider this when we support ALTERing
472    // the PARTITION BY columns.
473    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
474    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
475
476    let compaction_window = options.iter().find_map(|o| {
477        #[allow(irrefutable_let_patterns)]
478        if let crate::plan::TableOption::RetainHistory(lcw) = o {
479            Some(lcw.clone())
480        } else {
481            None
482        }
483    });
484
485    let table = Table {
486        create_sql,
487        desc,
488        temporary,
489        compaction_window,
490        data_source: TableDataSource::TableWrites { defaults },
491    };
492    Ok(Plan::CreateTable(CreateTablePlan {
493        name,
494        table,
495        if_not_exists: *if_not_exists,
496    }))
497}
498
499pub fn describe_create_table_from_source(
500    _: &StatementContext,
501    _: CreateTableFromSourceStatement<Aug>,
502) -> Result<StatementDesc, PlanError> {
503    Ok(StatementDesc::new(None))
504}
505
506pub fn describe_create_webhook_source(
507    _: &StatementContext,
508    _: CreateWebhookSourceStatement<Aug>,
509) -> Result<StatementDesc, PlanError> {
510    Ok(StatementDesc::new(None))
511}
512
513pub fn describe_create_source(
514    _: &StatementContext,
515    _: CreateSourceStatement<Aug>,
516) -> Result<StatementDesc, PlanError> {
517    Ok(StatementDesc::new(None))
518}
519
520pub fn describe_create_subsource(
521    _: &StatementContext,
522    _: CreateSubsourceStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524    Ok(StatementDesc::new(None))
525}
526
527generate_extracted_config!(
528    CreateSourceOption,
529    (TimestampInterval, Duration),
530    (RetainHistory, OptionalDuration)
531);
532
533generate_extracted_config!(
534    PgConfigOption,
535    (Details, String),
536    (Publication, String),
537    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
538    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
539);
540
541generate_extracted_config!(
542    MySqlConfigOption,
543    (Details, String),
544    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
545    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
546);
547
548generate_extracted_config!(
549    SqlServerConfigOption,
550    (Details, String),
551    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
552    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
553);
554
555pub fn plan_create_webhook_source(
556    scx: &StatementContext,
557    mut stmt: CreateWebhookSourceStatement<Aug>,
558) -> Result<Plan, PlanError> {
559    if stmt.is_table {
560        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
561    }
562
563    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
564    // we plan to normalize when we canonicalize the create statement.
565    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
566    let create_sql =
567        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
568
569    let CreateWebhookSourceStatement {
570        name,
571        if_not_exists,
572        body_format,
573        include_headers,
574        validate_using,
575        is_table,
576        // We resolved `in_cluster` above, so we want to ignore it here.
577        in_cluster: _,
578    } = stmt;
579
580    let validate_using = validate_using
581        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
582        .transpose()?;
583    if let Some(WebhookValidation { expression, .. }) = &validate_using {
584        // If the validation expression doesn't reference any part of the request, then we should
585        // return an error because it's almost definitely wrong.
586        if !expression.contains_column() {
587            return Err(PlanError::WebhookValidationDoesNotUseColumns);
588        }
589        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
590        // allow calls to `now()` because some webhook providers recommend rejecting requests that
591        // are older than a certain threshold.
592        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
593            return Err(PlanError::WebhookValidationNonDeterministic);
594        }
595    }
596
597    let body_format = match body_format {
598        Format::Bytes => WebhookBodyFormat::Bytes,
599        Format::Json { array } => WebhookBodyFormat::Json { array },
600        Format::Text => WebhookBodyFormat::Text,
601        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
602        ty => {
603            return Err(PlanError::Unsupported {
604                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
605                discussion_no: None,
606            });
607        }
608    };
609
610    let mut column_ty = vec![
611        // Always include the body of the request as the first column.
612        SqlColumnType {
613            scalar_type: SqlScalarType::from(body_format),
614            nullable: false,
615        },
616    ];
617    let mut column_names = vec!["body".to_string()];
618
619    let mut headers = WebhookHeaders::default();
620
621    // Include a `headers` column, possibly filtered.
622    if let Some(filters) = include_headers.column {
623        column_ty.push(SqlColumnType {
624            scalar_type: SqlScalarType::Map {
625                value_type: Box::new(SqlScalarType::String),
626                custom_id: None,
627            },
628            nullable: false,
629        });
630        column_names.push("headers".to_string());
631
632        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
633            filters.into_iter().partition_map(|filter| {
634                if filter.block {
635                    itertools::Either::Right(filter.header_name)
636                } else {
637                    itertools::Either::Left(filter.header_name)
638                }
639            });
640        headers.header_column = Some(WebhookHeaderFilters { allow, block });
641    }
642
643    // Map headers to specific columns.
644    for header in include_headers.mappings {
645        let scalar_type = header
646            .use_bytes
647            .then_some(SqlScalarType::Bytes)
648            .unwrap_or(SqlScalarType::String);
649        column_ty.push(SqlColumnType {
650            scalar_type,
651            nullable: true,
652        });
653        column_names.push(header.column_name.into_string());
654
655        let column_idx = column_ty.len() - 1;
656        // Double check we're consistent with column names.
657        assert_eq!(
658            column_idx,
659            column_names.len() - 1,
660            "header column names and types don't match"
661        );
662        headers
663            .mapped_headers
664            .insert(column_idx, (header.header_name, header.use_bytes));
665    }
666
667    // Validate our columns.
668    let mut unique_check = HashSet::with_capacity(column_names.len());
669    for name in &column_names {
670        if !unique_check.insert(name) {
671            return Err(PlanError::AmbiguousColumn(name.clone().into()));
672        }
673    }
674    if column_names.len() > MAX_NUM_COLUMNS {
675        return Err(PlanError::TooManyColumns {
676            max_num_columns: MAX_NUM_COLUMNS,
677            req_num_columns: column_names.len(),
678        });
679    }
680
681    let typ = SqlRelationType::new(column_ty);
682    let desc = RelationDesc::new(typ, column_names);
683
684    // Check for an object in the catalog with this same name
685    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
686    let full_name = scx.catalog.resolve_full_name(&name);
687    let partial_name = PartialItemName::from(full_name.clone());
688    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
689        return Err(PlanError::ItemAlreadyExists {
690            name: full_name.to_string(),
691            item_type: item.item_type(),
692        });
693    }
694
695    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
696    // such, we always use a default of EpochMilliseconds.
697    let timeline = Timeline::EpochMilliseconds;
698
699    let plan = if is_table {
700        let data_source = DataSourceDesc::Webhook {
701            validate_using,
702            body_format,
703            headers,
704            cluster_id: Some(in_cluster.id()),
705        };
706        let data_source = TableDataSource::DataSource {
707            desc: data_source,
708            timeline,
709        };
710        Plan::CreateTable(CreateTablePlan {
711            name,
712            if_not_exists,
713            table: Table {
714                create_sql,
715                desc: VersionedRelationDesc::new(desc),
716                temporary: false,
717                compaction_window: None,
718                data_source,
719            },
720        })
721    } else {
722        let data_source = DataSourceDesc::Webhook {
723            validate_using,
724            body_format,
725            headers,
726            // Important: The cluster is set at the `Source` level.
727            cluster_id: None,
728        };
729        Plan::CreateSource(CreateSourcePlan {
730            name,
731            source: Source {
732                create_sql,
733                data_source,
734                desc,
735                compaction_window: None,
736            },
737            if_not_exists,
738            timeline,
739            in_cluster: Some(in_cluster.id()),
740        })
741    };
742
743    Ok(plan)
744}
745
746pub fn plan_create_source(
747    scx: &StatementContext,
748    mut stmt: CreateSourceStatement<Aug>,
749) -> Result<Plan, PlanError> {
750    let CreateSourceStatement {
751        name,
752        in_cluster: _,
753        col_names,
754        connection: source_connection,
755        envelope,
756        if_not_exists,
757        format,
758        key_constraint,
759        include_metadata,
760        with_options,
761        external_references: referenced_subsources,
762        progress_subsource,
763    } = &stmt;
764
765    mz_ore::soft_assert_or_log!(
766        referenced_subsources.is_none(),
767        "referenced subsources must be cleared in purification"
768    );
769
770    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
771        && scx.catalog.system_vars().force_source_table_syntax();
772
773    // If the new source table syntax is forced all the options related to the primary
774    // source output should be un-set.
775    if force_source_table_syntax {
776        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
777            Err(PlanError::UseTablesForSources(
778                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
779            ))?;
780        }
781    }
782
783    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
784
785    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
786        && include_metadata
787            .iter()
788            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
789    {
790        // TODO(guswynn): should this be `bail_unsupported!`?
791        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
792    }
793    if !matches!(
794        source_connection,
795        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
796    ) && !include_metadata.is_empty()
797    {
798        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
799    }
800
801    if !include_metadata.is_empty()
802        && !matches!(
803            envelope,
804            ast::SourceEnvelope::Upsert { .. }
805                | ast::SourceEnvelope::None
806                | ast::SourceEnvelope::Debezium
807        )
808    {
809        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
810    }
811
812    let external_connection =
813        plan_generic_source_connection(scx, source_connection, include_metadata)?;
814
815    let CreateSourceOptionExtracted {
816        timestamp_interval,
817        retain_history,
818        seen: _,
819    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
820
821    let metadata_columns_desc = match external_connection {
822        GenericSourceConnection::Kafka(KafkaSourceConnection {
823            ref metadata_columns,
824            ..
825        }) => kafka_metadata_columns_desc(metadata_columns),
826        _ => vec![],
827    };
828
829    // Generate the relation description for the primary export of the source.
830    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
831        scx,
832        &envelope,
833        format,
834        Some(external_connection.default_key_desc()),
835        external_connection.default_value_desc(),
836        include_metadata,
837        metadata_columns_desc,
838        &external_connection,
839    )?;
840    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
841
842    let names: Vec<_> = desc.iter_names().cloned().collect();
843    if let Some(dup) = names.iter().duplicates().next() {
844        sql_bail!("column {} specified more than once", dup.quoted());
845    }
846
847    // Apply user-specified key constraint
848    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
849        // Don't remove this without addressing
850        // https://github.com/MaterializeInc/database-issues/issues/4371.
851        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
852
853        let key_columns = columns
854            .into_iter()
855            .map(normalize::column_name)
856            .collect::<Vec<_>>();
857
858        let mut uniq = BTreeSet::new();
859        for col in key_columns.iter() {
860            if !uniq.insert(col) {
861                sql_bail!("Repeated column name in source key constraint: {}", col);
862            }
863        }
864
865        let key_indices = key_columns
866            .iter()
867            .map(|col| {
868                let name_idx = desc
869                    .get_by_name(col)
870                    .map(|(idx, _type)| idx)
871                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
872                if desc.get_unambiguous_name(name_idx).is_none() {
873                    sql_bail!("Ambiguous column in source key constraint: {}", col);
874                }
875                Ok(name_idx)
876            })
877            .collect::<Result<Vec<_>, _>>()?;
878
879        if !desc.typ().keys.is_empty() {
880            return Err(key_constraint_err(&desc, &key_columns));
881        } else {
882            desc = desc.with_key(key_indices);
883        }
884    }
885
886    let timestamp_interval = match timestamp_interval {
887        Some(duration) => {
888            // Only validate bounds for new statements (pcx is Some), not during
889            // catalog deserialization (pcx is None). Previously persisted sources
890            // may have intervals that no longer fall within the current bounds.
891            if scx.pcx.is_some() {
892                let min = scx.catalog.system_vars().min_timestamp_interval();
893                let max = scx.catalog.system_vars().max_timestamp_interval();
894                if duration < min || duration > max {
895                    return Err(PlanError::InvalidTimestampInterval {
896                        min,
897                        max,
898                        requested: duration,
899                    });
900                }
901            }
902            duration
903        }
904        None => scx.catalog.system_vars().default_timestamp_interval(),
905    };
906
907    let (desc, data_source) = match progress_subsource {
908        Some(name) => {
909            let DeferredItemName::Named(name) = name else {
910                sql_bail!("[internal error] progress subsource must be named during purification");
911            };
912            let ResolvedItemName::Item { id, .. } = name else {
913                sql_bail!("[internal error] invalid target id");
914            };
915
916            let details = match external_connection {
917                GenericSourceConnection::Kafka(ref c) => {
918                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
919                        metadata_columns: c.metadata_columns.clone(),
920                    })
921                }
922                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
923                    LoadGenerator::Auction
924                    | LoadGenerator::Marketing
925                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
926                    LoadGenerator::Counter { .. }
927                    | LoadGenerator::Clock
928                    | LoadGenerator::Datums
929                    | LoadGenerator::KeyValue(_) => {
930                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
931                            output: LoadGeneratorOutput::Default,
932                        })
933                    }
934                },
935                GenericSourceConnection::Postgres(_)
936                | GenericSourceConnection::MySql(_)
937                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
938            };
939
940            let data_source = DataSourceDesc::OldSyntaxIngestion {
941                desc: SourceDesc {
942                    connection: external_connection,
943                    timestamp_interval,
944                },
945                progress_subsource: *id,
946                data_config: SourceExportDataConfig {
947                    encoding,
948                    envelope: envelope.clone(),
949                },
950                details,
951            };
952            (desc, data_source)
953        }
954        None => {
955            let desc = external_connection.timestamp_desc();
956            let data_source = DataSourceDesc::Ingestion(SourceDesc {
957                connection: external_connection,
958                timestamp_interval,
959            });
960            (desc, data_source)
961        }
962    };
963
964    let if_not_exists = *if_not_exists;
965    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
966
967    // Check for an object in the catalog with this same name
968    let full_name = scx.catalog.resolve_full_name(&name);
969    let partial_name = PartialItemName::from(full_name.clone());
970    // For PostgreSQL compatibility, we need to prevent creating sources when
971    // there is an existing object *or* type of the same name.
972    if let (false, Ok(item)) = (
973        if_not_exists,
974        scx.catalog.resolve_item_or_type(&partial_name),
975    ) {
976        return Err(PlanError::ItemAlreadyExists {
977            name: full_name.to_string(),
978            item_type: item.item_type(),
979        });
980    }
981
982    // We will rewrite the cluster if one is not provided, so we must use the
983    // `in_cluster` value we plan to normalize when we canonicalize the create
984    // statement.
985    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
986
987    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
988
989    // Determine a default timeline for the source.
990    let timeline = match envelope {
991        SourceEnvelope::CdcV2 => {
992            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
993        }
994        _ => Timeline::EpochMilliseconds,
995    };
996
997    let compaction_window = plan_retain_history_option(scx, retain_history)?;
998
999    let source = Source {
1000        create_sql,
1001        data_source,
1002        desc,
1003        compaction_window,
1004    };
1005
1006    Ok(Plan::CreateSource(CreateSourcePlan {
1007        name,
1008        source,
1009        if_not_exists,
1010        timeline,
1011        in_cluster: Some(in_cluster.id()),
1012    }))
1013}
1014
1015pub fn plan_generic_source_connection(
1016    scx: &StatementContext<'_>,
1017    source_connection: &CreateSourceConnection<Aug>,
1018    include_metadata: &Vec<SourceIncludeMetadata>,
1019) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1020    Ok(match source_connection {
1021        CreateSourceConnection::Kafka {
1022            connection,
1023            options,
1024        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1025            scx,
1026            connection,
1027            options,
1028            include_metadata,
1029        )?),
1030        CreateSourceConnection::Postgres {
1031            connection,
1032            options,
1033        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1034            scx, connection, options,
1035        )?),
1036        CreateSourceConnection::SqlServer {
1037            connection,
1038            options,
1039        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1040            scx, connection, options,
1041        )?),
1042        CreateSourceConnection::MySql {
1043            connection,
1044            options,
1045        } => {
1046            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1047        }
1048        CreateSourceConnection::LoadGenerator { generator, options } => {
1049            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1050                scx,
1051                generator,
1052                options,
1053                include_metadata,
1054            )?)
1055        }
1056    })
1057}
1058
1059fn plan_load_generator_source_connection(
1060    scx: &StatementContext<'_>,
1061    generator: &ast::LoadGenerator,
1062    options: &Vec<LoadGeneratorOption<Aug>>,
1063    include_metadata: &Vec<SourceIncludeMetadata>,
1064) -> Result<LoadGeneratorSourceConnection, PlanError> {
1065    let load_generator =
1066        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1067    let LoadGeneratorOptionExtracted {
1068        tick_interval,
1069        as_of,
1070        up_to,
1071        ..
1072    } = options.clone().try_into()?;
1073    let tick_micros = match tick_interval {
1074        Some(interval) => Some(interval.as_micros().try_into()?),
1075        None => None,
1076    };
1077    if up_to < as_of {
1078        sql_bail!("UP TO cannot be less than AS OF");
1079    }
1080    Ok(LoadGeneratorSourceConnection {
1081        load_generator,
1082        tick_micros,
1083        as_of,
1084        up_to,
1085    })
1086}
1087
1088fn plan_mysql_source_connection(
1089    scx: &StatementContext<'_>,
1090    connection: &ResolvedItemName,
1091    options: &Vec<MySqlConfigOption<Aug>>,
1092) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1093    let connection_item = scx.get_item_by_resolved_name(connection)?;
1094    match connection_item.connection()? {
1095        Connection::MySql(connection) => connection,
1096        _ => sql_bail!(
1097            "{} is not a MySQL connection",
1098            scx.catalog.resolve_full_name(connection_item.name())
1099        ),
1100    };
1101    let MySqlConfigOptionExtracted {
1102        details,
1103        // text/exclude columns are already part of the source-exports and are only included
1104        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1105        // be removed once we drop support for implicitly created subsources.
1106        text_columns: _,
1107        exclude_columns: _,
1108        seen: _,
1109    } = options.clone().try_into()?;
1110    let details = details
1111        .as_ref()
1112        .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1113    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1114    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1115    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1116    Ok(MySqlSourceConnection {
1117        connection: connection_item.id(),
1118        connection_id: connection_item.id(),
1119        details,
1120    })
1121}
1122
1123fn plan_sqlserver_source_connection(
1124    scx: &StatementContext<'_>,
1125    connection: &ResolvedItemName,
1126    options: &Vec<SqlServerConfigOption<Aug>>,
1127) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1128    let connection_item = scx.get_item_by_resolved_name(connection)?;
1129    match connection_item.connection()? {
1130        Connection::SqlServer(connection) => connection,
1131        _ => sql_bail!(
1132            "{} is not a SQL Server connection",
1133            scx.catalog.resolve_full_name(connection_item.name())
1134        ),
1135    };
1136    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1137    let details = details
1138        .as_ref()
1139        .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1140    let extras = hex::decode(details)
1141        .map_err(|e| sql_err!("{e}"))
1142        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1143        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1144    Ok(SqlServerSourceConnection {
1145        connection_id: connection_item.id(),
1146        connection: connection_item.id(),
1147        extras,
1148    })
1149}
1150
1151fn plan_postgres_source_connection(
1152    scx: &StatementContext<'_>,
1153    connection: &ResolvedItemName,
1154    options: &Vec<PgConfigOption<Aug>>,
1155) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1156    let connection_item = scx.get_item_by_resolved_name(connection)?;
1157    let PgConfigOptionExtracted {
1158        details,
1159        publication,
1160        // text columns are already part of the source-exports and are only included
1161        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1162        // be removed once we drop support for implicitly created subsources.
1163        text_columns: _,
1164        // exclude columns are already part of the source-exports and are only included
1165        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1166        // be removed once we drop support for implicitly created subsources.
1167        exclude_columns: _,
1168        seen: _,
1169    } = options.clone().try_into()?;
1170    let details = details
1171        .as_ref()
1172        .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1173    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1174    let details =
1175        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1176    let publication_details =
1177        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1178    Ok(PostgresSourceConnection {
1179        connection: connection_item.id(),
1180        connection_id: connection_item.id(),
1181        // Validated during purification.
1182        publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1183        publication_details,
1184    })
1185}
1186
1187fn plan_kafka_source_connection(
1188    scx: &StatementContext<'_>,
1189    connection_name: &ResolvedItemName,
1190    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1191    include_metadata: &Vec<SourceIncludeMetadata>,
1192) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1193    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1194    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1195        sql_bail!(
1196            "{} is not a kafka connection",
1197            scx.catalog.resolve_full_name(connection_item.name())
1198        )
1199    }
1200    let KafkaSourceConfigOptionExtracted {
1201        group_id_prefix,
1202        topic,
1203        topic_metadata_refresh_interval,
1204        start_timestamp: _, // purified into `start_offset`
1205        start_offset,
1206        seen: _,
1207    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1208    // Validated during purification.
1209    let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1210    let mut start_offsets = BTreeMap::new();
1211    if let Some(offsets) = start_offset {
1212        for (part, offset) in offsets.iter().enumerate() {
1213            if *offset < 0 {
1214                sql_bail!("START OFFSET must be a nonnegative integer");
1215            }
1216            start_offsets.insert(i32::try_from(part)?, *offset);
1217        }
1218    }
1219    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1220        // This is a librdkafka-enforced restriction that, if violated,
1221        // would result in a runtime error for the source.
1222        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1223    }
1224    let metadata_columns = include_metadata
1225        .into_iter()
1226        .flat_map(|item| match item {
1227            SourceIncludeMetadata::Timestamp { alias } => {
1228                let name = match alias {
1229                    Some(name) => name.to_string(),
1230                    None => "timestamp".to_owned(),
1231                };
1232                Some((name, KafkaMetadataKind::Timestamp))
1233            }
1234            SourceIncludeMetadata::Partition { alias } => {
1235                let name = match alias {
1236                    Some(name) => name.to_string(),
1237                    None => "partition".to_owned(),
1238                };
1239                Some((name, KafkaMetadataKind::Partition))
1240            }
1241            SourceIncludeMetadata::Offset { alias } => {
1242                let name = match alias {
1243                    Some(name) => name.to_string(),
1244                    None => "offset".to_owned(),
1245                };
1246                Some((name, KafkaMetadataKind::Offset))
1247            }
1248            SourceIncludeMetadata::Headers { alias } => {
1249                let name = match alias {
1250                    Some(name) => name.to_string(),
1251                    None => "headers".to_owned(),
1252                };
1253                Some((name, KafkaMetadataKind::Headers))
1254            }
1255            SourceIncludeMetadata::Header {
1256                alias,
1257                key,
1258                use_bytes,
1259            } => Some((
1260                alias.to_string(),
1261                KafkaMetadataKind::Header {
1262                    key: key.clone(),
1263                    use_bytes: *use_bytes,
1264                },
1265            )),
1266            SourceIncludeMetadata::Key { .. } => {
1267                // handled below
1268                None
1269            }
1270        })
1271        .collect();
1272    Ok(KafkaSourceConnection {
1273        connection: connection_item.id(),
1274        connection_id: connection_item.id(),
1275        topic,
1276        start_offsets,
1277        group_id_prefix,
1278        topic_metadata_refresh_interval,
1279        metadata_columns,
1280    })
1281}
1282
1283fn apply_source_envelope_encoding(
1284    scx: &StatementContext,
1285    envelope: &ast::SourceEnvelope,
1286    format: &Option<FormatSpecifier<Aug>>,
1287    key_desc: Option<RelationDesc>,
1288    value_desc: RelationDesc,
1289    include_metadata: &[SourceIncludeMetadata],
1290    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1291    source_connection: &GenericSourceConnection<ReferencedConnection>,
1292) -> Result<
1293    (
1294        RelationDesc,
1295        SourceEnvelope,
1296        Option<SourceDataEncoding<ReferencedConnection>>,
1297    ),
1298    PlanError,
1299> {
1300    let encoding = match format {
1301        Some(format) => Some(get_encoding(scx, format, envelope)?),
1302        None => None,
1303    };
1304
1305    let (key_desc, value_desc) = match &encoding {
1306        Some(encoding) => {
1307            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1308            // single column of type bytes.
1309            match value_desc.typ().columns() {
1310                [typ] => match typ.scalar_type {
1311                    SqlScalarType::Bytes => {}
1312                    _ => sql_bail!(
1313                        "The schema produced by the source is incompatible with format decoding"
1314                    ),
1315                },
1316                _ => sql_bail!(
1317                    "The schema produced by the source is incompatible with format decoding"
1318                ),
1319            }
1320
1321            let (key_desc, value_desc) = encoding.desc()?;
1322
1323            // TODO(petrosagg): This piece of code seems to be making a statement about the
1324            // nullability of the NONE envelope when the source is Kafka. As written, the code
1325            // misses opportunities to mark columns as not nullable and is over conservative. For
1326            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1327            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1328            // should be replaced with precise type-level reasoning.
1329            let key_desc = key_desc.map(|desc| {
1330                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1331                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1332                if is_kafka && is_envelope_none {
1333                    RelationDesc::from_names_and_types(
1334                        desc.into_iter()
1335                            .map(|(name, typ)| (name, typ.nullable(true))),
1336                    )
1337                } else {
1338                    desc
1339                }
1340            });
1341            (key_desc, value_desc)
1342        }
1343        None => (key_desc, value_desc),
1344    };
1345
1346    // KEY VALUE load generators are the only UPSERT source that
1347    // has no encoding but defaults to `INCLUDE KEY`.
1348    //
1349    // As discussed
1350    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1351    // removing this special case amounts to deciding how to handle null keys
1352    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1353    // this special case in.
1354    //
1355    // Note that this is safe because this generator is
1356    // 1. The only source with no encoding that can have its key included.
1357    // 2. Never produces null keys (or values, for that matter).
1358    let key_envelope_no_encoding = matches!(
1359        source_connection,
1360        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1361            load_generator: LoadGenerator::KeyValue(_),
1362            ..
1363        })
1364    );
1365    let mut key_envelope = get_key_envelope(
1366        include_metadata,
1367        encoding.as_ref(),
1368        key_envelope_no_encoding,
1369    )?;
1370
1371    match (&envelope, &key_envelope) {
1372        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1373        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1374            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1375        ),
1376        _ => {}
1377    };
1378
1379    // Not all source envelopes are compatible with all source connections.
1380    // Whoever constructs the source ingestion pipeline is responsible for
1381    // choosing compatible envelopes and connections.
1382    //
1383    // TODO(guswynn): ambiguously assert which connections and envelopes are
1384    // compatible in typechecking
1385    //
1386    // TODO: remove bails as more support for upsert is added.
1387    let envelope = match &envelope {
1388        // TODO: fixup key envelope
1389        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1390        ast::SourceEnvelope::Debezium => {
1391            //TODO check that key envelope is not set
1392            let after_idx = match typecheck_debezium(&value_desc) {
1393                Ok((_before_idx, after_idx)) => Ok(after_idx),
1394                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1395                    Some(DataEncoding::Avro(_)) => Err(type_err),
1396                    _ => Err(sql_err!(
1397                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1398                    )),
1399                },
1400            }?;
1401
1402            UnplannedSourceEnvelope::Upsert {
1403                style: UpsertStyle::Debezium { after_idx },
1404            }
1405        }
1406        ast::SourceEnvelope::Upsert {
1407            value_decode_err_policy,
1408        } => {
1409            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1410                None => {
1411                    if !key_envelope_no_encoding {
1412                        bail_unsupported!(format!(
1413                            "UPSERT requires a key/value format: {:?}",
1414                            format
1415                        ))
1416                    }
1417                    None
1418                }
1419                Some(key_encoding) => Some(key_encoding),
1420            };
1421            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1422            // specified.
1423            if key_envelope == KeyEnvelope::None {
1424                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1425            }
1426            // If the value decode error policy is not set we use the default upsert style.
1427            let style = match value_decode_err_policy.as_slice() {
1428                [] => UpsertStyle::Default(key_envelope),
1429                [SourceErrorPolicy::Inline { alias }] => {
1430                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1431                    UpsertStyle::ValueErrInline {
1432                        key_envelope,
1433                        error_column: alias
1434                            .as_ref()
1435                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1436                    }
1437                }
1438                _ => {
1439                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1440                }
1441            };
1442
1443            UnplannedSourceEnvelope::Upsert { style }
1444        }
1445        ast::SourceEnvelope::CdcV2 => {
1446            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1447            //TODO check that key envelope is not set
1448            match format {
1449                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1450                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1451            }
1452            UnplannedSourceEnvelope::CdcV2
1453        }
1454    };
1455
1456    let metadata_desc = included_column_desc(metadata_columns_desc);
1457    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1458
1459    Ok((desc, envelope, encoding))
1460}
1461
1462/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1463/// of columns and constraints.
1464fn plan_source_export_desc(
1465    scx: &StatementContext,
1466    name: &UnresolvedItemName,
1467    columns: &Vec<ColumnDef<Aug>>,
1468    constraints: &Vec<TableConstraint<Aug>>,
1469) -> Result<RelationDesc, PlanError> {
1470    let names: Vec<_> = columns
1471        .iter()
1472        .map(|c| normalize::column_name(c.name.clone()))
1473        .collect();
1474
1475    if let Some(dup) = names.iter().duplicates().next() {
1476        sql_bail!("column {} specified more than once", dup.quoted());
1477    }
1478
1479    // Build initial relation type that handles declared data types
1480    // and NOT NULL constraints.
1481    let mut column_types = Vec::with_capacity(columns.len());
1482    let mut keys = Vec::new();
1483
1484    for (i, c) in columns.into_iter().enumerate() {
1485        let aug_data_type = &c.data_type;
1486        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1487        let mut nullable = true;
1488        for option in &c.options {
1489            match &option.option {
1490                ColumnOption::NotNull => nullable = false,
1491                ColumnOption::Default(_) => {
1492                    bail_unsupported!("Source export with default value")
1493                }
1494                ColumnOption::Unique { is_primary } => {
1495                    keys.push(vec![i]);
1496                    if *is_primary {
1497                        nullable = false;
1498                    }
1499                }
1500                other => {
1501                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1502                }
1503            }
1504        }
1505        column_types.push(ty.nullable(nullable));
1506    }
1507
1508    let mut seen_primary = false;
1509    'c: for constraint in constraints {
1510        match constraint {
1511            TableConstraint::Unique {
1512                name: _,
1513                columns,
1514                is_primary,
1515                nulls_not_distinct,
1516            } => {
1517                if seen_primary && *is_primary {
1518                    sql_bail!(
1519                        "multiple primary keys for source export {} are not allowed",
1520                        name.to_ast_string_stable()
1521                    );
1522                }
1523                seen_primary = *is_primary || seen_primary;
1524
1525                let mut key = vec![];
1526                for column in columns {
1527                    let column = normalize::column_name(column.clone());
1528                    match names.iter().position(|name| *name == column) {
1529                        None => sql_bail!("unknown column in constraint: {}", column),
1530                        Some(i) => {
1531                            let nullable = &mut column_types[i].nullable;
1532                            if *is_primary {
1533                                if *nulls_not_distinct {
1534                                    sql_bail!(
1535                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1536                                    );
1537                                }
1538                                *nullable = false;
1539                            } else if !(*nulls_not_distinct || !*nullable) {
1540                                // Non-primary key unique constraints are only keys if all of their
1541                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1542                                break 'c;
1543                            }
1544
1545                            key.push(i);
1546                        }
1547                    }
1548                }
1549
1550                if *is_primary {
1551                    keys.insert(0, key);
1552                } else {
1553                    keys.push(key);
1554                }
1555            }
1556            TableConstraint::ForeignKey { .. } => {
1557                bail_unsupported!("Source export with a foreign key")
1558            }
1559            TableConstraint::Check { .. } => {
1560                bail_unsupported!("Source export with a check constraint")
1561            }
1562        }
1563    }
1564
1565    let typ = SqlRelationType::new(column_types).with_keys(keys);
1566    let desc = RelationDesc::new(typ, names);
1567    Ok(desc)
1568}
1569
1570generate_extracted_config!(
1571    CreateSubsourceOption,
1572    (Progress, bool, Default(false)),
1573    (ExternalReference, UnresolvedItemName),
1574    (RetainHistory, OptionalDuration),
1575    (TextColumns, Vec::<Ident>, Default(vec![])),
1576    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1577    (Details, String)
1578);
1579
1580pub fn plan_create_subsource(
1581    scx: &StatementContext,
1582    stmt: CreateSubsourceStatement<Aug>,
1583) -> Result<Plan, PlanError> {
1584    let CreateSubsourceStatement {
1585        name,
1586        columns,
1587        of_source,
1588        constraints,
1589        if_not_exists,
1590        with_options,
1591    } = &stmt;
1592
1593    let CreateSubsourceOptionExtracted {
1594        progress,
1595        retain_history,
1596        external_reference,
1597        text_columns,
1598        exclude_columns,
1599        details,
1600        seen: _,
1601    } = with_options.clone().try_into()?;
1602
1603    // This invariant is enforced during purification; we are responsible for
1604    // creating the AST for subsources as a response to CREATE SOURCE
1605    // statements, so this would fire in integration testing if we failed to
1606    // uphold it.
1607    if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1608        bail_internal!(
1609            "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1610        );
1611    }
1612
1613    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615    let data_source = if let Some(source_reference) = of_source {
1616        // If the new source table syntax is forced we should not be creating any non-progress
1617        // subsources.
1618        if scx.catalog.system_vars().enable_create_table_from_source()
1619            && scx.catalog.system_vars().force_source_table_syntax()
1620        {
1621            Err(PlanError::UseTablesForSources(
1622                "CREATE SUBSOURCE".to_string(),
1623            ))?;
1624        }
1625
1626        // This is a subsource with the "natural" dependency order, i.e. it is
1627        // not a legacy subsource with the inverted structure.
1628        let ingestion_id = *source_reference.item_id();
1629        let external_reference = external_reference.ok_or_else(|| {
1630            sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1631        })?;
1632
1633        // Decode the details option stored on the subsource statement, which contains information
1634        // created during the purification process.
1635        let details = details
1636            .as_ref()
1637            .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1638        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1639        let details =
1640            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1641        let details =
1642            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1643        let details = match details {
1644            SourceExportStatementDetails::Postgres { table } => {
1645                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1646                    column_casts: crate::pure::postgres::generate_column_casts(
1647                        scx,
1648                        &table,
1649                        &text_columns,
1650                    )?,
1651                    table,
1652                })
1653            }
1654            SourceExportStatementDetails::MySql {
1655                table,
1656                initial_gtid_set,
1657                binlog_full_metadata,
1658            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1659                table,
1660                initial_gtid_set,
1661                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1662                exclude_columns: exclude_columns
1663                    .into_iter()
1664                    .map(|c| c.into_string())
1665                    .collect(),
1666                binlog_full_metadata,
1667            }),
1668            SourceExportStatementDetails::SqlServer {
1669                table,
1670                capture_instance,
1671                initial_lsn,
1672            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1673                capture_instance,
1674                table,
1675                initial_lsn,
1676                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1677                exclude_columns: exclude_columns
1678                    .into_iter()
1679                    .map(|c| c.into_string())
1680                    .collect(),
1681            }),
1682            SourceExportStatementDetails::LoadGenerator { output } => {
1683                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1684            }
1685            SourceExportStatementDetails::Kafka {} => {
1686                bail_unsupported!("subsources cannot reference Kafka sources")
1687            }
1688        };
1689        DataSourceDesc::IngestionExport {
1690            ingestion_id,
1691            external_reference,
1692            details,
1693            // Subsources don't currently support non-default envelopes / encoding
1694            data_config: SourceExportDataConfig {
1695                envelope: SourceEnvelope::None(NoneEnvelope {
1696                    key_envelope: KeyEnvelope::None,
1697                    key_arity: 0,
1698                }),
1699                encoding: None,
1700            },
1701        }
1702    } else if progress {
1703        DataSourceDesc::Progress
1704    } else {
1705        sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1706    };
1707
1708    let if_not_exists = *if_not_exists;
1709    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1710
1711    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1712
1713    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1714    let source = Source {
1715        create_sql,
1716        data_source,
1717        desc,
1718        compaction_window,
1719    };
1720
1721    Ok(Plan::CreateSource(CreateSourcePlan {
1722        name,
1723        source,
1724        if_not_exists,
1725        timeline: Timeline::EpochMilliseconds,
1726        in_cluster: None,
1727    }))
1728}
1729
1730generate_extracted_config!(
1731    TableFromSourceOption,
1732    (TextColumns, Vec::<Ident>, Default(vec![])),
1733    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1734    (PartitionBy, Vec<Ident>),
1735    (RetainHistory, OptionalDuration),
1736    (Details, String)
1737);
1738
1739pub fn plan_create_table_from_source(
1740    scx: &StatementContext,
1741    stmt: CreateTableFromSourceStatement<Aug>,
1742) -> Result<Plan, PlanError> {
1743    if !scx.catalog.system_vars().enable_create_table_from_source() {
1744        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1745    }
1746
1747    let CreateTableFromSourceStatement {
1748        name,
1749        columns,
1750        constraints,
1751        if_not_exists,
1752        source,
1753        external_reference,
1754        envelope,
1755        format,
1756        include_metadata,
1757        with_options,
1758    } = &stmt;
1759
1760    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1761
1762    let TableFromSourceOptionExtracted {
1763        text_columns,
1764        exclude_columns,
1765        retain_history,
1766        partition_by,
1767        details,
1768        seen: _,
1769    } = with_options.clone().try_into()?;
1770
1771    let source_item = scx.get_item_by_resolved_name(source)?;
1772    let ingestion_id = source_item.id();
1773
1774    // Decode the details option stored on the statement, which contains information
1775    // created during the purification process.
1776    let details = details
1777        .as_ref()
1778        .ok_or_else(|| internal_err!("source-export missing details"))?;
1779    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1780    let details =
1781        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1782    let details =
1783        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1784
1785    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1786        && include_metadata
1787            .iter()
1788            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1789    {
1790        // TODO(guswynn): should this be `bail_unsupported!`?
1791        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1792    }
1793    if !matches!(
1794        details,
1795        SourceExportStatementDetails::Kafka { .. }
1796            | SourceExportStatementDetails::LoadGenerator { .. }
1797    ) && !include_metadata.is_empty()
1798    {
1799        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1800    }
1801
1802    let details = match details {
1803        SourceExportStatementDetails::Postgres { table } => {
1804            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1805                column_casts: crate::pure::postgres::generate_column_casts(
1806                    scx,
1807                    &table,
1808                    &text_columns,
1809                )?,
1810                table,
1811            })
1812        }
1813        SourceExportStatementDetails::MySql {
1814            table,
1815            initial_gtid_set,
1816            binlog_full_metadata,
1817        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1818            table,
1819            initial_gtid_set,
1820            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1821            exclude_columns: exclude_columns
1822                .into_iter()
1823                .map(|c| c.into_string())
1824                .collect(),
1825            binlog_full_metadata,
1826        }),
1827        SourceExportStatementDetails::SqlServer {
1828            table,
1829            capture_instance,
1830            initial_lsn,
1831        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1832            table,
1833            capture_instance,
1834            initial_lsn,
1835            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1836            exclude_columns: exclude_columns
1837                .into_iter()
1838                .map(|c| c.into_string())
1839                .collect(),
1840        }),
1841        SourceExportStatementDetails::LoadGenerator { output } => {
1842            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1843        }
1844        SourceExportStatementDetails::Kafka {} => {
1845            if !include_metadata.is_empty()
1846                && !matches!(
1847                    envelope,
1848                    ast::SourceEnvelope::Upsert { .. }
1849                        | ast::SourceEnvelope::None
1850                        | ast::SourceEnvelope::Debezium
1851                )
1852            {
1853                // TODO(guswynn): should this be `bail_unsupported!`?
1854                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1855            }
1856
1857            let metadata_columns = include_metadata
1858                .into_iter()
1859                .flat_map(|item| match item {
1860                    SourceIncludeMetadata::Timestamp { alias } => {
1861                        let name = match alias {
1862                            Some(name) => name.to_string(),
1863                            None => "timestamp".to_owned(),
1864                        };
1865                        Some((name, KafkaMetadataKind::Timestamp))
1866                    }
1867                    SourceIncludeMetadata::Partition { alias } => {
1868                        let name = match alias {
1869                            Some(name) => name.to_string(),
1870                            None => "partition".to_owned(),
1871                        };
1872                        Some((name, KafkaMetadataKind::Partition))
1873                    }
1874                    SourceIncludeMetadata::Offset { alias } => {
1875                        let name = match alias {
1876                            Some(name) => name.to_string(),
1877                            None => "offset".to_owned(),
1878                        };
1879                        Some((name, KafkaMetadataKind::Offset))
1880                    }
1881                    SourceIncludeMetadata::Headers { alias } => {
1882                        let name = match alias {
1883                            Some(name) => name.to_string(),
1884                            None => "headers".to_owned(),
1885                        };
1886                        Some((name, KafkaMetadataKind::Headers))
1887                    }
1888                    SourceIncludeMetadata::Header {
1889                        alias,
1890                        key,
1891                        use_bytes,
1892                    } => Some((
1893                        alias.to_string(),
1894                        KafkaMetadataKind::Header {
1895                            key: key.clone(),
1896                            use_bytes: *use_bytes,
1897                        },
1898                    )),
1899                    SourceIncludeMetadata::Key { .. } => {
1900                        // handled below
1901                        None
1902                    }
1903                })
1904                .collect();
1905
1906            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1907        }
1908    };
1909
1910    let source_connection = &source_item
1911        .source_desc()?
1912        .ok_or_else(|| sql_err!("item is not a source"))?
1913        .connection;
1914
1915    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1916    // during purification and define the `columns` and `constraints` fields for the statement,
1917    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1918    // we use the source connection's default schema.
1919    let (key_desc, value_desc) =
1920        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1921            let columns = match columns {
1922                TableFromSourceColumns::Defined(columns) => columns,
1923                _ => bail_internal!("expected column definitions to be present"),
1924            };
1925            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1926            (None, desc)
1927        } else {
1928            let key_desc = source_connection.default_key_desc();
1929            let value_desc = source_connection.default_value_desc();
1930            (Some(key_desc), value_desc)
1931        };
1932
1933    let metadata_columns_desc = match &details {
1934        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1935            metadata_columns, ..
1936        }) => kafka_metadata_columns_desc(metadata_columns),
1937        _ => vec![],
1938    };
1939
1940    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1941        scx,
1942        &envelope,
1943        format,
1944        key_desc,
1945        value_desc,
1946        include_metadata,
1947        metadata_columns_desc,
1948        source_connection,
1949    )?;
1950    if let TableFromSourceColumns::Named(col_names) = columns {
1951        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1952    }
1953
1954    let names: Vec<_> = desc.iter_names().cloned().collect();
1955    if let Some(dup) = names.iter().duplicates().next() {
1956        sql_bail!("column {} specified more than once", dup.quoted());
1957    }
1958
1959    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1960
1961    // Allow users to specify a timeline. If they do not, determine a default
1962    // timeline for the source.
1963    let timeline = match envelope {
1964        SourceEnvelope::CdcV2 => {
1965            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1966        }
1967        _ => Timeline::EpochMilliseconds,
1968    };
1969
1970    if let Some(partition_by) = partition_by {
1971        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1972        check_partition_by(&desc, partition_by)?;
1973    }
1974
1975    let data_source = DataSourceDesc::IngestionExport {
1976        ingestion_id,
1977        // Populated during purification.
1978        external_reference: external_reference
1979            .as_ref()
1980            .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1981            .clone(),
1982        details,
1983        data_config: SourceExportDataConfig { envelope, encoding },
1984    };
1985
1986    let if_not_exists = *if_not_exists;
1987
1988    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1989
1990    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1991    let table = Table {
1992        create_sql,
1993        desc: VersionedRelationDesc::new(desc),
1994        temporary: false,
1995        compaction_window,
1996        data_source: TableDataSource::DataSource {
1997            desc: data_source,
1998            timeline,
1999        },
2000    };
2001
2002    Ok(Plan::CreateTable(CreateTablePlan {
2003        name,
2004        table,
2005        if_not_exists,
2006    }))
2007}
2008
2009generate_extracted_config!(
2010    LoadGeneratorOption,
2011    (TickInterval, Duration),
2012    (AsOf, u64, Default(0_u64)),
2013    (UpTo, u64, Default(u64::MAX)),
2014    (ScaleFactor, f64),
2015    (MaxCardinality, u64),
2016    (Keys, u64),
2017    (SnapshotRounds, u64),
2018    (TransactionalSnapshot, bool),
2019    (ValueSize, u64),
2020    (Seed, u64),
2021    (Partitions, u64),
2022    (BatchSize, u64)
2023);
2024
2025impl LoadGeneratorOptionExtracted {
2026    pub(super) fn ensure_only_valid_options(
2027        &self,
2028        loadgen: &ast::LoadGenerator,
2029    ) -> Result<(), PlanError> {
2030        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2031
2032        let mut options = self.seen.clone();
2033
2034        let permitted_options: &[_] = match loadgen {
2035            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2036            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2037            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2038            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2039            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2040            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2041            ast::LoadGenerator::KeyValue => &[
2042                TickInterval,
2043                Keys,
2044                SnapshotRounds,
2045                TransactionalSnapshot,
2046                ValueSize,
2047                Seed,
2048                Partitions,
2049                BatchSize,
2050            ],
2051        };
2052
2053        for o in permitted_options {
2054            options.remove(o);
2055        }
2056
2057        if !options.is_empty() {
2058            sql_bail!(
2059                "{} load generators do not support {} values",
2060                loadgen,
2061                options.iter().join(", ")
2062            )
2063        }
2064
2065        Ok(())
2066    }
2067}
2068
2069pub(crate) fn load_generator_ast_to_generator(
2070    scx: &StatementContext,
2071    loadgen: &ast::LoadGenerator,
2072    options: &[LoadGeneratorOption<Aug>],
2073    include_metadata: &[SourceIncludeMetadata],
2074) -> Result<LoadGenerator, PlanError> {
2075    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2076    extracted.ensure_only_valid_options(loadgen)?;
2077
2078    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2079        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2080    }
2081
2082    let load_generator = match loadgen {
2083        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2084        ast::LoadGenerator::Clock => {
2085            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2086            LoadGenerator::Clock
2087        }
2088        ast::LoadGenerator::Counter => {
2089            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2090            let LoadGeneratorOptionExtracted {
2091                max_cardinality, ..
2092            } = extracted;
2093            LoadGenerator::Counter { max_cardinality }
2094        }
2095        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2096        ast::LoadGenerator::Datums => {
2097            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2098            LoadGenerator::Datums
2099        }
2100        ast::LoadGenerator::Tpch => {
2101            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2102
2103            // Default to 0.01 scale factor (=10MB).
2104            let sf: f64 = scale_factor.unwrap_or(0.01);
2105            if !sf.is_finite() || sf < 0.0 {
2106                sql_bail!("unsupported scale factor {sf}");
2107            }
2108
2109            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2110                let total = (sf * multiplier).floor();
2111                let mut i = i64::try_cast_from(total)
2112                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2113                if i < 1 {
2114                    i = 1;
2115                }
2116                Ok(i)
2117            };
2118
2119            // The multiplications here are safely unchecked because they will
2120            // overflow to infinity, which will be caught by f64_to_i64.
2121            let count_supplier = f_to_i(10_000f64)?;
2122            let count_part = f_to_i(200_000f64)?;
2123            let count_customer = f_to_i(150_000f64)?;
2124            let count_orders = f_to_i(150_000f64 * 10f64)?;
2125            let count_clerk = f_to_i(1_000f64)?;
2126
2127            LoadGenerator::Tpch {
2128                count_supplier,
2129                count_part,
2130                count_customer,
2131                count_orders,
2132                count_clerk,
2133            }
2134        }
2135        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2136            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2137            let LoadGeneratorOptionExtracted {
2138                keys,
2139                snapshot_rounds,
2140                transactional_snapshot,
2141                value_size,
2142                tick_interval,
2143                seed,
2144                partitions,
2145                batch_size,
2146                ..
2147            } = extracted;
2148
2149            let mut include_offset = None;
2150            for im in include_metadata {
2151                match im {
2152                    SourceIncludeMetadata::Offset { alias } => {
2153                        include_offset = match alias {
2154                            Some(alias) => Some(alias.to_string()),
2155                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2156                        }
2157                    }
2158                    SourceIncludeMetadata::Key { .. } => continue,
2159
2160                    _ => {
2161                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2162                    }
2163                };
2164            }
2165
2166            let lgkv = KeyValueLoadGenerator {
2167                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2168                snapshot_rounds: snapshot_rounds
2169                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2170                // Defaults to true.
2171                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2172                value_size: value_size
2173                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2174                partitions: partitions
2175                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2176                tick_interval,
2177                batch_size: batch_size
2178                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2179                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2180                include_offset,
2181            };
2182
2183            if lgkv.keys == 0
2184                || lgkv.partitions == 0
2185                || lgkv.value_size == 0
2186                || lgkv.batch_size == 0
2187            {
2188                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2189            }
2190
2191            if lgkv.keys % lgkv.partitions != 0 {
2192                sql_bail!("KEYS must be a multiple of PARTITIONS")
2193            }
2194
2195            if lgkv.batch_size > lgkv.keys {
2196                sql_bail!("KEYS must be larger than BATCH SIZE")
2197            }
2198
2199            // This constraints simplifies the source implementation.
2200            // We can lift it later.
2201            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2202                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2203            }
2204
2205            if lgkv.snapshot_rounds == 0 {
2206                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2207            }
2208
2209            LoadGenerator::KeyValue(lgkv)
2210        }
2211    };
2212
2213    Ok(load_generator)
2214}
2215
2216fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2217    let before = value_desc.get_by_name(&"before".into());
2218    let (after_idx, after_ty) = value_desc
2219        .get_by_name(&"after".into())
2220        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2221    let before_idx = if let Some((before_idx, before_ty)) = before {
2222        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2223            sql_bail!("'before' column must be of type record");
2224        }
2225        if before_ty != after_ty {
2226            sql_bail!("'before' type differs from 'after' column");
2227        }
2228        Some(before_idx)
2229    } else {
2230        None
2231    };
2232    Ok((before_idx, after_idx))
2233}
2234
2235fn get_encoding(
2236    scx: &StatementContext,
2237    format: &FormatSpecifier<Aug>,
2238    envelope: &ast::SourceEnvelope,
2239) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2240    let encoding = match format {
2241        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2242        FormatSpecifier::KeyValue { key, value } => {
2243            let key = {
2244                let encoding = get_encoding_inner(scx, key)?;
2245                Some(encoding.key.unwrap_or(encoding.value))
2246            };
2247            let value = get_encoding_inner(scx, value)?.value;
2248            SourceDataEncoding { key, value }
2249        }
2250    };
2251
2252    let requires_keyvalue = matches!(
2253        envelope,
2254        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2255    );
2256    let is_keyvalue = encoding.key.is_some();
2257    if requires_keyvalue && !is_keyvalue {
2258        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2259    };
2260
2261    Ok(encoding)
2262}
2263
2264/// Determine the cluster ID to use for this item.
2265///
2266/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2267/// Because of this, do not normalize/canonicalize the create SQL statement
2268/// until after calling this function.
2269fn source_sink_cluster_config<'a, 'ctx>(
2270    scx: &'a StatementContext<'ctx>,
2271    in_cluster: &mut Option<ResolvedClusterName>,
2272) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2273    let cluster = match in_cluster {
2274        None => {
2275            let cluster = scx.catalog.resolve_cluster(None)?;
2276            *in_cluster = Some(ResolvedClusterName {
2277                id: cluster.id(),
2278                print_name: None,
2279            });
2280            cluster
2281        }
2282        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2283    };
2284
2285    Ok(cluster)
2286}
2287
2288generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2289
2290#[derive(Debug)]
2291pub struct Schema {
2292    pub key_schema: Option<String>,
2293    pub value_schema: String,
2294    /// Reference schemas for the key schema, in dependency order.
2295    pub key_reference_schemas: Vec<String>,
2296    /// Reference schemas for the value schema, in dependency order.
2297    pub value_reference_schemas: Vec<String>,
2298    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2299    pub confluent_wire_format: bool,
2300}
2301
2302fn get_encoding_inner(
2303    scx: &StatementContext,
2304    format: &Format<Aug>,
2305) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2306    let value = match format {
2307        Format::Bytes => DataEncoding::Bytes,
2308        Format::Avro(schema) => {
2309            let Schema {
2310                key_schema,
2311                value_schema,
2312                key_reference_schemas,
2313                value_reference_schemas,
2314                csr_connection,
2315                confluent_wire_format,
2316            } = match schema {
2317                // TODO(jldlaughlin): we need a way to pass in primary key information
2318                // when building a source from a string or file.
2319                AvroSchema::InlineSchema {
2320                    schema: ast::Schema { schema },
2321                    with_options,
2322                } => {
2323                    let AvroSchemaOptionExtracted {
2324                        confluent_wire_format,
2325                        ..
2326                    } = with_options.clone().try_into()?;
2327
2328                    Schema {
2329                        key_schema: None,
2330                        value_schema: schema.clone(),
2331                        key_reference_schemas: vec![],
2332                        value_reference_schemas: vec![],
2333                        csr_connection: None,
2334                        confluent_wire_format,
2335                    }
2336                }
2337                AvroSchema::Csr {
2338                    csr_connection:
2339                        CsrConnectionAvro {
2340                            connection,
2341                            seed,
2342                            key_strategy: _,
2343                            value_strategy: _,
2344                        },
2345                } => {
2346                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2347                    let csr_connection = match item.connection()? {
2348                        Connection::Csr(_) => item.id(),
2349                        _ => {
2350                            sql_bail!(
2351                                "{} is not a schema registry connection",
2352                                scx.catalog
2353                                    .resolve_full_name(item.name())
2354                                    .to_string()
2355                                    .quoted()
2356                            )
2357                        }
2358                    };
2359
2360                    if let Some(seed) = seed {
2361                        Schema {
2362                            key_schema: seed.key_schema.clone(),
2363                            value_schema: seed.value_schema.clone(),
2364                            key_reference_schemas: seed.key_reference_schemas.clone(),
2365                            value_reference_schemas: seed.value_reference_schemas.clone(),
2366                            csr_connection: Some(csr_connection),
2367                            confluent_wire_format: true,
2368                        }
2369                    } else {
2370                        sql_bail!("Avro CSR seed resolution has not been performed")
2371                    }
2372                }
2373            };
2374
2375            if let Some(key_schema) = key_schema {
2376                return Ok(SourceDataEncoding {
2377                    key: Some(DataEncoding::Avro(AvroEncoding {
2378                        schema: key_schema,
2379                        reference_schemas: key_reference_schemas,
2380                        csr_connection: csr_connection.clone(),
2381                        confluent_wire_format,
2382                    })),
2383                    value: DataEncoding::Avro(AvroEncoding {
2384                        schema: value_schema,
2385                        reference_schemas: value_reference_schemas,
2386                        csr_connection,
2387                        confluent_wire_format,
2388                    }),
2389                });
2390            } else {
2391                DataEncoding::Avro(AvroEncoding {
2392                    schema: value_schema,
2393                    reference_schemas: value_reference_schemas,
2394                    csr_connection,
2395                    confluent_wire_format,
2396                })
2397            }
2398        }
2399        Format::Protobuf(schema) => match schema {
2400            ProtobufSchema::Csr {
2401                csr_connection:
2402                    CsrConnectionProtobuf {
2403                        connection:
2404                            CsrConnection {
2405                                connection,
2406                                options,
2407                            },
2408                        seed,
2409                    },
2410            } => {
2411                if let Some(CsrSeedProtobuf { key, value }) = seed {
2412                    let item = scx.get_item_by_resolved_name(connection)?;
2413                    let _ = match item.connection()? {
2414                        Connection::Csr(connection) => connection,
2415                        _ => {
2416                            sql_bail!(
2417                                "{} is not a schema registry connection",
2418                                scx.catalog
2419                                    .resolve_full_name(item.name())
2420                                    .to_string()
2421                                    .quoted()
2422                            )
2423                        }
2424                    };
2425
2426                    if !options.is_empty() {
2427                        sql_bail!("Protobuf CSR connections do not support any options");
2428                    }
2429
2430                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2431                        descriptors: strconv::parse_bytes(&value.schema)?,
2432                        message_name: value.message_name.clone(),
2433                        confluent_wire_format: true,
2434                    });
2435                    if let Some(key) = key {
2436                        return Ok(SourceDataEncoding {
2437                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2438                                descriptors: strconv::parse_bytes(&key.schema)?,
2439                                message_name: key.message_name.clone(),
2440                                confluent_wire_format: true,
2441                            })),
2442                            value,
2443                        });
2444                    }
2445                    value
2446                } else {
2447                    sql_bail!("Protobuf CSR seed resolution has not been performed")
2448                }
2449            }
2450            ProtobufSchema::InlineSchema {
2451                message_name,
2452                schema: ast::Schema { schema },
2453            } => {
2454                let descriptors = strconv::parse_bytes(schema)?;
2455
2456                DataEncoding::Protobuf(ProtobufEncoding {
2457                    descriptors,
2458                    message_name: message_name.to_owned(),
2459                    confluent_wire_format: false,
2460                })
2461            }
2462        },
2463        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2464            regex: mz_repr::adt::regex::Regex::new(regex, false)
2465                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2466        }),
2467        Format::Csv { columns, delimiter } => {
2468            let columns = match columns {
2469                CsvColumns::Header { names } => {
2470                    if names.is_empty() {
2471                        sql_bail!("[internal error] column spec should get names in purify")
2472                    }
2473                    ColumnSpec::Header {
2474                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2475                    }
2476                }
2477                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2478            };
2479            DataEncoding::Csv(CsvEncoding {
2480                columns,
2481                delimiter: u8::try_from(*delimiter)
2482                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2483            })
2484        }
2485        Format::Json { array: false } => DataEncoding::Json,
2486        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2487        Format::Text => DataEncoding::Text,
2488    };
2489    Ok(SourceDataEncoding { key: None, value })
2490}
2491
2492/// Extract the key envelope, if it is requested
2493fn get_key_envelope(
2494    included_items: &[SourceIncludeMetadata],
2495    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2496    key_envelope_no_encoding: bool,
2497) -> Result<KeyEnvelope, PlanError> {
2498    let key_definition = included_items
2499        .iter()
2500        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2501    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2502        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2503            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2504            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2505            (Some(name), _) if key_envelope_no_encoding => {
2506                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2507            }
2508            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2509            (_, None) => {
2510                // `kd.alias` == `None` means `INCLUDE KEY`
2511                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2512                // These both make sense with the same error message
2513                sql_bail!(
2514                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2515                        got bare FORMAT"
2516                );
2517            }
2518        }
2519    } else {
2520        Ok(KeyEnvelope::None)
2521    }
2522}
2523
2524/// Gets the key envelope for a given key encoding when no name for the key has
2525/// been requested by the user.
2526fn get_unnamed_key_envelope(
2527    key: Option<&DataEncoding<ReferencedConnection>>,
2528) -> Result<KeyEnvelope, PlanError> {
2529    // If the key is requested but comes from an unnamed type then it gets the name "key"
2530    //
2531    // Otherwise it gets the names of the columns in the type
2532    let is_composite = match key {
2533        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2534        Some(
2535            DataEncoding::Avro(_)
2536            | DataEncoding::Csv(_)
2537            | DataEncoding::Protobuf(_)
2538            | DataEncoding::Regex { .. },
2539        ) => true,
2540        None => false,
2541    };
2542
2543    if is_composite {
2544        Ok(KeyEnvelope::Flattened)
2545    } else {
2546        Ok(KeyEnvelope::Named("key".to_string()))
2547    }
2548}
2549
2550pub fn describe_create_view(
2551    _: &StatementContext,
2552    _: CreateViewStatement<Aug>,
2553) -> Result<StatementDesc, PlanError> {
2554    Ok(StatementDesc::new(None))
2555}
2556
2557pub fn plan_view(
2558    scx: &StatementContext,
2559    def: &mut ViewDefinition<Aug>,
2560    temporary: bool,
2561) -> Result<(QualifiedItemName, View), PlanError> {
2562    let create_sql = normalize::create_statement(
2563        scx,
2564        Statement::CreateView(CreateViewStatement {
2565            if_exists: IfExistsBehavior::Error,
2566            temporary,
2567            definition: def.clone(),
2568        }),
2569    )?;
2570
2571    let ViewDefinition {
2572        name,
2573        columns,
2574        query,
2575    } = def;
2576
2577    let query::PlannedRootQuery {
2578        expr,
2579        mut desc,
2580        finishing,
2581        scope: _,
2582    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2583    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2584    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2585    // here to help with database-issues#236. However, in the meantime, there might be a better
2586    // approach to solve database-issues#236:
2587    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2588    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2589        &finishing,
2590        expr.arity()
2591    ));
2592    if expr.contains_parameters()? {
2593        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2594    }
2595
2596    let dependencies = expr
2597        .depends_on()
2598        .into_iter()
2599        .map(|gid| scx.catalog.resolve_item_id(&gid))
2600        .collect();
2601
2602    let name = if temporary {
2603        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2604    } else {
2605        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2606    };
2607
2608    plan_utils::maybe_rename_columns(
2609        format!("view {}", scx.catalog.resolve_full_name(&name)),
2610        &mut desc,
2611        columns,
2612    )?;
2613    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2614
2615    if let Some(dup) = names.iter().duplicates().next() {
2616        sql_bail!("column {} specified more than once", dup.quoted());
2617    }
2618
2619    let view = View {
2620        create_sql,
2621        expr,
2622        dependencies,
2623        column_names: names,
2624        temporary,
2625    };
2626
2627    Ok((name, view))
2628}
2629
2630pub fn plan_create_view(
2631    scx: &StatementContext,
2632    mut stmt: CreateViewStatement<Aug>,
2633) -> Result<Plan, PlanError> {
2634    let CreateViewStatement {
2635        temporary,
2636        if_exists,
2637        definition,
2638    } = &mut stmt;
2639    let (name, view) = plan_view(scx, definition, *temporary)?;
2640
2641    // Override the statement-level IfExistsBehavior with Skip if this is
2642    // explicitly requested in the PlanContext (the default is `false`).
2643    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2644
2645    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2646        let if_exists = true;
2647        let cascade = false;
2648        let maybe_item_to_drop = plan_drop_item(
2649            scx,
2650            ObjectType::View,
2651            if_exists,
2652            definition.name.clone(),
2653            cascade,
2654        )?;
2655
2656        // Check if the new View depends on the item that we would be replacing.
2657        if let Some(id) = maybe_item_to_drop {
2658            let dependencies = view.expr.depends_on();
2659            let invalid_drop = scx
2660                .get_item(&id)
2661                .global_ids()
2662                .any(|gid| dependencies.contains(&gid));
2663            if invalid_drop {
2664                let item = scx.catalog.get_item(&id);
2665                sql_bail!(
2666                    "cannot replace view {0}: depended upon by new {0} definition",
2667                    scx.catalog.resolve_full_name(item.name())
2668                );
2669            }
2670
2671            Some(id)
2672        } else {
2673            None
2674        }
2675    } else {
2676        None
2677    };
2678    let drop_ids = replace
2679        .map(|id| {
2680            scx.catalog
2681                .item_dependents(id)
2682                .into_iter()
2683                .map(|id| id.unwrap_item_id())
2684                .collect()
2685        })
2686        .unwrap_or_default();
2687
2688    validate_view_dependencies(scx, &view.dependencies.0)?;
2689
2690    // Check for an object in the catalog with this same name
2691    let full_name = scx.catalog.resolve_full_name(&name);
2692    let partial_name = PartialItemName::from(full_name.clone());
2693    // For PostgreSQL compatibility, we need to prevent creating views when
2694    // there is an existing object *or* type of the same name.
2695    if let (Ok(item), IfExistsBehavior::Error, false) = (
2696        scx.catalog.resolve_item_or_type(&partial_name),
2697        *if_exists,
2698        ignore_if_exists_errors,
2699    ) {
2700        return Err(PlanError::ItemAlreadyExists {
2701            name: full_name.to_string(),
2702            item_type: item.item_type(),
2703        });
2704    }
2705
2706    Ok(Plan::CreateView(CreateViewPlan {
2707        name,
2708        view,
2709        replace,
2710        drop_ids,
2711        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2712        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2713    }))
2714}
2715
2716/// Validate the dependencies of a (materialized) view.
2717fn validate_view_dependencies(
2718    scx: &StatementContext,
2719    dependencies: &BTreeSet<CatalogItemId>,
2720) -> Result<(), PlanError> {
2721    for id in dependencies {
2722        let item = scx.catalog.get_item(id);
2723        if item.replacement_target().is_some() {
2724            let name = scx.catalog.minimal_qualification(item.name());
2725            return Err(PlanError::InvalidDependency {
2726                name: name.to_string(),
2727                item_type: format!("replacement {}", item.item_type()),
2728            });
2729        }
2730    }
2731
2732    Ok(())
2733}
2734
2735pub fn describe_create_materialized_view(
2736    _: &StatementContext,
2737    _: CreateMaterializedViewStatement<Aug>,
2738) -> Result<StatementDesc, PlanError> {
2739    Ok(StatementDesc::new(None))
2740}
2741
2742pub fn describe_create_network_policy(
2743    _: &StatementContext,
2744    _: CreateNetworkPolicyStatement<Aug>,
2745) -> Result<StatementDesc, PlanError> {
2746    Ok(StatementDesc::new(None))
2747}
2748
2749pub fn describe_alter_network_policy(
2750    _: &StatementContext,
2751    _: AlterNetworkPolicyStatement<Aug>,
2752) -> Result<StatementDesc, PlanError> {
2753    Ok(StatementDesc::new(None))
2754}
2755
2756pub fn plan_create_materialized_view(
2757    scx: &StatementContext,
2758    mut stmt: CreateMaterializedViewStatement<Aug>,
2759) -> Result<Plan, PlanError> {
2760    let cluster_id =
2761        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2762    stmt.in_cluster = Some(ResolvedClusterName {
2763        id: cluster_id,
2764        print_name: None,
2765    });
2766
2767    let target_replica = match &stmt.in_cluster_replica {
2768        Some(replica_name) => {
2769            scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2770
2771            let cluster = scx.catalog.get_cluster(cluster_id);
2772            let replica_id = cluster
2773                .replica_ids()
2774                .get(replica_name.as_str())
2775                .copied()
2776                .ok_or_else(|| {
2777                    CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2778                })?;
2779            Some(replica_id)
2780        }
2781        None => None,
2782    };
2783
2784    let create_sql =
2785        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2786
2787    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2788    let name = scx.allocate_qualified_name(partial_name.clone())?;
2789
2790    let query::PlannedRootQuery {
2791        expr,
2792        mut desc,
2793        finishing,
2794        scope: _,
2795    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2796    // We get back a trivial finishing, see comment in `plan_view`.
2797    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2798        &finishing,
2799        expr.arity()
2800    ));
2801    if expr.contains_parameters()? {
2802        return Err(PlanError::ParameterNotAllowed(
2803            "materialized views".to_string(),
2804        ));
2805    }
2806
2807    plan_utils::maybe_rename_columns(
2808        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2809        &mut desc,
2810        &stmt.columns,
2811    )?;
2812    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2813
2814    let MaterializedViewOptionExtracted {
2815        assert_not_null,
2816        partition_by,
2817        retain_history,
2818        refresh,
2819        seen: _,
2820    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2821
2822    if let Some(partition_by) = partition_by {
2823        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2824        check_partition_by(&desc, partition_by)?;
2825    }
2826
2827    let refresh_schedule = {
2828        let mut refresh_schedule = RefreshSchedule::default();
2829        let mut on_commits_seen = 0;
2830        for refresh_option_value in refresh {
2831            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2832                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2833            }
2834            match refresh_option_value {
2835                RefreshOptionValue::OnCommit => {
2836                    on_commits_seen += 1;
2837                }
2838                RefreshOptionValue::AtCreation => {
2839                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2840                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2841                }
2842                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2843                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2844                    let ecx = &ExprContext {
2845                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2846                        name: "REFRESH AT",
2847                        scope: &Scope::empty(),
2848                        relation_type: &SqlRelationType::empty(),
2849                        allow_aggregates: false,
2850                        allow_subqueries: false,
2851                        allow_parameters: false,
2852                        allow_windows: false,
2853                    };
2854                    let hir = plan_expr(ecx, &time)?.cast_to(
2855                        ecx,
2856                        CastContext::Assignment,
2857                        &SqlScalarType::MzTimestamp,
2858                    )?;
2859                    // (mz_now was purified away to a literal earlier)
2860                    let timestamp = hir
2861                        .into_literal_mz_timestamp()
2862                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2863                    refresh_schedule.ats.push(timestamp);
2864                }
2865                RefreshOptionValue::Every(RefreshEveryOptionValue {
2866                    interval,
2867                    aligned_to,
2868                }) => {
2869                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2870                    if interval.as_microseconds() <= 0 {
2871                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2872                    }
2873                    if interval.months != 0 {
2874                        // This limitation is because we want Intervals to be cleanly convertable
2875                        // to a unix epoch timestamp difference. When the interval involves months, then
2876                        // this is not true anymore, because months have variable lengths.
2877                        // See `Timestamp::round_up`.
2878                        sql_bail!("REFRESH interval must not involve units larger than days");
2879                    }
2880                    let interval = interval.duration()?;
2881                    if u64::try_from(interval.as_millis()).is_err() {
2882                        sql_bail!("REFRESH interval too large");
2883                    }
2884
2885                    let mut aligned_to = match aligned_to {
2886                        Some(aligned_to) => aligned_to,
2887                        None => {
2888                            soft_panic_or_log!(
2889                                "ALIGNED TO should have been filled in by purification"
2890                            );
2891                            sql_bail!(
2892                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2893                            )
2894                        }
2895                    };
2896
2897                    // Desugar the `aligned_to` expression
2898                    transform_ast::transform(scx, &mut aligned_to)?;
2899
2900                    let ecx = &ExprContext {
2901                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2902                        name: "REFRESH EVERY ... ALIGNED TO",
2903                        scope: &Scope::empty(),
2904                        relation_type: &SqlRelationType::empty(),
2905                        allow_aggregates: false,
2906                        allow_subqueries: false,
2907                        allow_parameters: false,
2908                        allow_windows: false,
2909                    };
2910                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2911                        ecx,
2912                        CastContext::Assignment,
2913                        &SqlScalarType::MzTimestamp,
2914                    )?;
2915                    // (mz_now was purified away to a literal earlier)
2916                    let aligned_to_const = aligned_to_hir
2917                        .into_literal_mz_timestamp()
2918                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2919
2920                    refresh_schedule.everies.push(RefreshEvery {
2921                        interval,
2922                        aligned_to: aligned_to_const,
2923                    });
2924                }
2925            }
2926        }
2927
2928        if on_commits_seen > 1 {
2929            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2930        }
2931        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2932            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2933        }
2934
2935        if refresh_schedule == RefreshSchedule::default() {
2936            None
2937        } else {
2938            Some(refresh_schedule)
2939        }
2940    };
2941
2942    let as_of = stmt.as_of.map(Timestamp::from);
2943    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2944    let mut non_null_assertions = assert_not_null
2945        .into_iter()
2946        .map(normalize::column_name)
2947        .map(|assertion_name| {
2948            column_names
2949                .iter()
2950                .position(|col| col == &assertion_name)
2951                .ok_or_else(|| {
2952                    sql_err!(
2953                        "column {} in ASSERT NOT NULL option not found",
2954                        assertion_name.quoted()
2955                    )
2956                })
2957        })
2958        .collect::<Result<Vec<_>, _>>()?;
2959    non_null_assertions.sort();
2960    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2961        let dup = &column_names[*dup];
2962        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2963    }
2964
2965    if let Some(dup) = column_names.iter().duplicates().next() {
2966        sql_bail!("column {} specified more than once", dup.quoted());
2967    }
2968
2969    // Override the statement-level IfExistsBehavior with Skip if this is
2970    // explicitly requested in the PlanContext (the default is `false`).
2971    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2972        Ok(true) => IfExistsBehavior::Skip,
2973        _ => stmt.if_exists,
2974    };
2975
2976    let mut replace = None;
2977    let mut if_not_exists = false;
2978    match if_exists {
2979        IfExistsBehavior::Replace => {
2980            let if_exists = true;
2981            let cascade = false;
2982            let replace_id = plan_drop_item(
2983                scx,
2984                ObjectType::MaterializedView,
2985                if_exists,
2986                partial_name.clone().into(),
2987                cascade,
2988            )?;
2989
2990            // Check if the new Materialized View depends on the item that we would be replacing.
2991            if let Some(id) = replace_id {
2992                let dependencies = expr.depends_on();
2993                let invalid_drop = scx
2994                    .get_item(&id)
2995                    .global_ids()
2996                    .any(|gid| dependencies.contains(&gid));
2997                if invalid_drop {
2998                    let item = scx.catalog.get_item(&id);
2999                    sql_bail!(
3000                        "cannot replace materialized view {0}: depended upon by new {0} definition",
3001                        scx.catalog.resolve_full_name(item.name())
3002                    );
3003                }
3004                replace = Some(id);
3005            }
3006        }
3007        IfExistsBehavior::Skip => if_not_exists = true,
3008        IfExistsBehavior::Error => (),
3009    }
3010    let drop_ids = replace
3011        .map(|id| {
3012            scx.catalog
3013                .item_dependents(id)
3014                .into_iter()
3015                .map(|id| id.unwrap_item_id())
3016                .collect()
3017        })
3018        .unwrap_or_default();
3019    let mut dependencies: BTreeSet<_> = expr
3020        .depends_on()
3021        .into_iter()
3022        .map(|gid| scx.catalog.resolve_item_id(&gid))
3023        .collect();
3024
3025    // Validate the replacement target, if one is given.
3026    let mut replacement_target = None;
3027    if let Some(target_name) = &stmt.replacement_for {
3028        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3029
3030        let target = scx.get_item_by_resolved_name(target_name)?;
3031        if target.item_type() != CatalogItemType::MaterializedView {
3032            return Err(PlanError::InvalidReplacement {
3033                item_type: target.item_type(),
3034                item_name: scx.catalog.minimal_qualification(target.name()),
3035                replacement_type: CatalogItemType::MaterializedView,
3036                replacement_name: partial_name,
3037            });
3038        }
3039        if target.id().is_system() {
3040            sql_bail!(
3041                "cannot replace {} because it is required by the database system",
3042                scx.catalog.minimal_qualification(target.name()),
3043            );
3044        }
3045
3046        // Check for dependency cycles.
3047        for dependent in scx.catalog.item_dependents(target.id()) {
3048            if let ObjectId::Item(id) = dependent
3049                && dependencies.contains(&id)
3050            {
3051                sql_bail!(
3052                    "replacement would cause {} to depend on itself",
3053                    scx.catalog.minimal_qualification(target.name()),
3054                );
3055            }
3056        }
3057
3058        dependencies.insert(target.id());
3059
3060        for use_id in target.used_by() {
3061            let use_item = scx.get_item(use_id);
3062            if use_item.replacement_target() == Some(target.id()) {
3063                sql_bail!(
3064                    "cannot replace {} because it already has a replacement: {}",
3065                    scx.catalog.minimal_qualification(target.name()),
3066                    scx.catalog.minimal_qualification(use_item.name()),
3067                );
3068            }
3069        }
3070
3071        replacement_target = Some(target.id());
3072    }
3073
3074    validate_view_dependencies(scx, &dependencies)?;
3075
3076    // Check for an object in the catalog with this same name
3077    let full_name = scx.catalog.resolve_full_name(&name);
3078    let partial_name = PartialItemName::from(full_name.clone());
3079    // For PostgreSQL compatibility, we need to prevent creating materialized
3080    // views when there is an existing object *or* type of the same name.
3081    if let (IfExistsBehavior::Error, Ok(item)) =
3082        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3083    {
3084        return Err(PlanError::ItemAlreadyExists {
3085            name: full_name.to_string(),
3086            item_type: item.item_type(),
3087        });
3088    }
3089
3090    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3091        name,
3092        materialized_view: MaterializedView {
3093            create_sql,
3094            expr,
3095            dependencies: DependencyIds(dependencies),
3096            column_names,
3097            replacement_target,
3098            cluster_id,
3099            target_replica,
3100            non_null_assertions,
3101            compaction_window,
3102            refresh_schedule,
3103            as_of,
3104        },
3105        replace,
3106        drop_ids,
3107        if_not_exists,
3108        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3109    }))
3110}
3111
3112generate_extracted_config!(
3113    MaterializedViewOption,
3114    (AssertNotNull, Ident, AllowMultiple),
3115    (PartitionBy, Vec<Ident>),
3116    (RetainHistory, OptionalDuration),
3117    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3118);
3119
3120pub fn describe_create_sink(
3121    _: &StatementContext,
3122    _: CreateSinkStatement<Aug>,
3123) -> Result<StatementDesc, PlanError> {
3124    Ok(StatementDesc::new(None))
3125}
3126
3127generate_extracted_config!(
3128    CreateSinkOption,
3129    (Snapshot, bool),
3130    (PartitionStrategy, String),
3131    (Version, u64),
3132    (CommitInterval, Duration)
3133);
3134
3135pub fn plan_create_sink(
3136    scx: &StatementContext,
3137    stmt: CreateSinkStatement<Aug>,
3138) -> Result<Plan, PlanError> {
3139    // Check for an object in the catalog with this same name
3140    let Some(name) = stmt.name.clone() else {
3141        return Err(PlanError::MissingName(CatalogItemType::Sink));
3142    };
3143    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3144    let full_name = scx.catalog.resolve_full_name(&name);
3145    let partial_name = PartialItemName::from(full_name.clone());
3146    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3147        return Err(PlanError::ItemAlreadyExists {
3148            name: full_name.to_string(),
3149            item_type: item.item_type(),
3150        });
3151    }
3152
3153    plan_sink(scx, stmt)
3154}
3155
3156/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3157/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3158/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3159/// important.
3160fn plan_sink(
3161    scx: &StatementContext,
3162    mut stmt: CreateSinkStatement<Aug>,
3163) -> Result<Plan, PlanError> {
3164    let CreateSinkStatement {
3165        name,
3166        in_cluster: _,
3167        from,
3168        connection,
3169        format,
3170        envelope,
3171        mode,
3172        if_not_exists,
3173        with_options,
3174    } = stmt.clone();
3175
3176    let Some(name) = name else {
3177        return Err(PlanError::MissingName(CatalogItemType::Sink));
3178    };
3179    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3180
3181    let envelope = match (&connection, envelope, mode) {
3182        // Kafka sinks use ENVELOPE
3183        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3184            SinkEnvelope::Upsert
3185        }
3186        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3187            SinkEnvelope::Debezium
3188        }
3189        (CreateSinkConnection::Kafka { .. }, None, None) => {
3190            sql_bail!("ENVELOPE clause is required")
3191        }
3192        (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3193            sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3194        }
3195        // Iceberg sinks use MODE
3196        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3197            SinkEnvelope::Upsert
3198        }
3199        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3200            SinkEnvelope::Append
3201        }
3202        (CreateSinkConnection::Iceberg { .. }, None, None) => {
3203            sql_bail!("MODE clause is required")
3204        }
3205        (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3206            sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3207        }
3208    };
3209
3210    let from_name = &from;
3211    let from = scx.get_item_by_resolved_name(&from)?;
3212
3213    {
3214        use CatalogItemType::*;
3215        match from.item_type() {
3216            Table | Source | MaterializedView => {
3217                if from.replacement_target().is_some() {
3218                    let name = scx.catalog.minimal_qualification(from.name());
3219                    return Err(PlanError::InvalidSinkFrom {
3220                        name: name.to_string(),
3221                        item_type: format!("replacement {}", from.item_type()),
3222                    });
3223                }
3224            }
3225            Sink | View | Index | Type | Func | Secret | Connection => {
3226                let name = scx.catalog.minimal_qualification(from.name());
3227                return Err(PlanError::InvalidSinkFrom {
3228                    name: name.to_string(),
3229                    item_type: from.item_type().to_string(),
3230                });
3231            }
3232        }
3233    }
3234
3235    if from.id().is_system() {
3236        bail_unsupported!("creating a sink directly on a catalog object");
3237    }
3238
3239    let desc = from
3240        .relation_desc()
3241        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3242    let key_indices = match &connection {
3243        CreateSinkConnection::Kafka { key: Some(key), .. }
3244        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3245            let key_columns = key
3246                .key_columns
3247                .clone()
3248                .into_iter()
3249                .map(normalize::column_name)
3250                .collect::<Vec<_>>();
3251            let mut uniq = BTreeSet::new();
3252            for col in key_columns.iter() {
3253                if !uniq.insert(col) {
3254                    sql_bail!("duplicate column referenced in KEY: {}", col);
3255                }
3256            }
3257            let indices = key_columns
3258                .iter()
3259                .map(|col| {
3260                    let name_idx =
3261                        desc.get_by_name(col)
3262                            .map(|(idx, _type)| idx)
3263                            .ok_or_else(|| {
3264                                sql_err!("column referenced in KEY does not exist: {}", col)
3265                            })?;
3266                    if desc.get_unambiguous_name(name_idx).is_none() {
3267                        sql_bail!("column referenced in KEY is ambiguous: {}", col);
3268                    }
3269                    Ok(name_idx)
3270                })
3271                .collect::<Result<Vec<_>, _>>()?;
3272
3273            // Iceberg equality deletes require primitive, non-float key columns.
3274            // Use an allow-list so that new types are rejected by default.
3275            if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3276                let cols: Vec<_> = desc.iter().collect();
3277                for &idx in &indices {
3278                    let (col_name, col_type) = cols[idx];
3279                    let scalar = &col_type.scalar_type;
3280                    let is_valid = matches!(
3281                        scalar,
3282                        // integers
3283                        SqlScalarType::Bool
3284                            | SqlScalarType::Int16
3285                            | SqlScalarType::Int32
3286                            | SqlScalarType::Int64
3287                            | SqlScalarType::UInt16
3288                            | SqlScalarType::UInt32
3289                            | SqlScalarType::UInt64
3290                            // decimal / numeric
3291                            | SqlScalarType::Numeric { .. }
3292                            // date / time
3293                            | SqlScalarType::Date
3294                            | SqlScalarType::Time
3295                            | SqlScalarType::Timestamp { .. }
3296                            | SqlScalarType::TimestampTz { .. }
3297                            | SqlScalarType::Interval
3298                            | SqlScalarType::MzTimestamp
3299                            // string-like
3300                            | SqlScalarType::String
3301                            | SqlScalarType::Char { .. }
3302                            | SqlScalarType::VarChar { .. }
3303                            | SqlScalarType::PgLegacyChar
3304                            | SqlScalarType::PgLegacyName
3305                            | SqlScalarType::Bytes
3306                            | SqlScalarType::Jsonb
3307                            // identifiers
3308                            | SqlScalarType::Uuid
3309                            | SqlScalarType::Oid
3310                            | SqlScalarType::RegProc
3311                            | SqlScalarType::RegType
3312                            | SqlScalarType::RegClass
3313                            | SqlScalarType::MzAclItem
3314                            | SqlScalarType::AclItem
3315                            | SqlScalarType::Int2Vector
3316                            // ranges
3317                            | SqlScalarType::Range { .. }
3318                    );
3319                    if !is_valid {
3320                        return Err(PlanError::IcebergSinkUnsupportedKeyType {
3321                            column: col_name.to_string(),
3322                            column_type: format!("{:?}", scalar),
3323                        });
3324                    }
3325                }
3326            }
3327
3328            let is_valid_key = desc
3329                .typ()
3330                .keys
3331                .iter()
3332                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3333
3334            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3335                if key.not_enforced {
3336                    scx.catalog
3337                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3338                            key: key_columns.clone(),
3339                            name: name.item.clone(),
3340                        })
3341                } else {
3342                    return Err(PlanError::UpsertSinkWithInvalidKey {
3343                        name: from_name.full_name_str(),
3344                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3345                        valid_keys: desc
3346                            .typ()
3347                            .keys
3348                            .iter()
3349                            .map(|key| {
3350                                key.iter()
3351                                    .map(|col| desc.get_name(*col).as_str().into())
3352                                    .collect()
3353                            })
3354                            .collect(),
3355                    });
3356                }
3357            }
3358            Some(indices)
3359        }
3360        CreateSinkConnection::Kafka { key: None, .. }
3361        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3362    };
3363
3364    if key_indices.is_some() && envelope == SinkEnvelope::Append {
3365        sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3366    }
3367
3368    // Reject input columns that clash with the columns MODE APPEND adds to the Iceberg table.
3369    if envelope == SinkEnvelope::Append {
3370        if let CreateSinkConnection::Iceberg { .. } = &connection {
3371            use mz_storage_types::sinks::{
3372                ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3373            };
3374            for (col_name, _) in desc.iter() {
3375                if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3376                    || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3377                {
3378                    sql_bail!(
3379                        "column {} conflicts with the system column that MODE APPEND \
3380                         adds to the Iceberg table",
3381                        col_name.quoted()
3382                    );
3383                }
3384            }
3385        }
3386    }
3387
3388    let headers_index = match &connection {
3389        CreateSinkConnection::Kafka {
3390            headers: Some(headers),
3391            ..
3392        } => {
3393            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3394
3395            match envelope {
3396                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3397                SinkEnvelope::Debezium => {
3398                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3399                }
3400            };
3401
3402            let headers = normalize::column_name(headers.clone());
3403            let (idx, ty) = desc
3404                .get_by_name(&headers)
3405                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3406
3407            if desc.get_unambiguous_name(idx).is_none() {
3408                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3409            }
3410
3411            match &ty.scalar_type {
3412                SqlScalarType::Map { value_type, .. }
3413                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3414                _ => sql_bail!(
3415                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3416                ),
3417            }
3418
3419            Some(idx)
3420        }
3421        _ => None,
3422    };
3423
3424    // pick the first valid natural relation key, if any
3425    let relation_key_indices = desc.typ().keys.get(0).cloned();
3426
3427    let key_desc_and_indices = key_indices.map(|key_indices| {
3428        let cols = desc
3429            .iter()
3430            .map(|(name, ty)| (name.clone(), ty.clone()))
3431            .collect::<Vec<_>>();
3432        let (names, types): (Vec<_>, Vec<_>) =
3433            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3434        let typ = SqlRelationType::new(types);
3435        (RelationDesc::new(typ, names), key_indices)
3436    });
3437
3438    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3439        return Err(PlanError::UpsertSinkWithoutKey);
3440    }
3441
3442    let CreateSinkOptionExtracted {
3443        snapshot,
3444        version,
3445        partition_strategy: _,
3446        seen: _,
3447        commit_interval,
3448    } = with_options.try_into()?;
3449
3450    let connection_builder = match connection {
3451        CreateSinkConnection::Kafka {
3452            connection,
3453            options,
3454            ..
3455        } => kafka_sink_builder(
3456            scx,
3457            connection,
3458            options,
3459            format,
3460            relation_key_indices,
3461            key_desc_and_indices,
3462            headers_index,
3463            desc.into_owned(),
3464            envelope,
3465            from.id(),
3466            commit_interval,
3467        )?,
3468        CreateSinkConnection::Iceberg {
3469            connection,
3470            aws_connection,
3471            options,
3472            ..
3473        } => iceberg_sink_builder(
3474            scx,
3475            connection,
3476            aws_connection,
3477            options,
3478            relation_key_indices,
3479            key_desc_and_indices,
3480            commit_interval,
3481            &desc,
3482        )?,
3483    };
3484
3485    // WITH SNAPSHOT defaults to true
3486    let with_snapshot = snapshot.unwrap_or(true);
3487    // VERSION defaults to 0
3488    let version = version.unwrap_or(0);
3489
3490    // We will rewrite the cluster if one is not provided, so we must use the
3491    // `in_cluster` value we plan to normalize when we canonicalize the create
3492    // statement.
3493    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3494    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3495
3496    Ok(Plan::CreateSink(CreateSinkPlan {
3497        name,
3498        sink: Sink {
3499            create_sql,
3500            from: from.global_id(),
3501            connection: connection_builder,
3502            envelope,
3503            version,
3504            commit_interval,
3505        },
3506        with_snapshot,
3507        if_not_exists,
3508        in_cluster: in_cluster.id(),
3509    }))
3510}
3511
3512fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3513    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3514
3515    let existing_keys = desc
3516        .typ()
3517        .keys
3518        .iter()
3519        .map(|key_columns| {
3520            key_columns
3521                .iter()
3522                .map(|col| desc.get_name(*col).as_str())
3523                .join(", ")
3524        })
3525        .join(", ");
3526
3527    sql_err!(
3528        "Key constraint ({}) conflicts with existing key ({})",
3529        user_keys,
3530        existing_keys
3531    )
3532}
3533
3534/// Creating this by hand instead of using generate_extracted_config! macro
3535/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3536#[derive(Debug, Default, PartialEq, Clone)]
3537pub struct CsrConfigOptionExtracted {
3538    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3539    pub(crate) avro_key_fullname: Option<String>,
3540    pub(crate) avro_value_fullname: Option<String>,
3541    pub(crate) null_defaults: bool,
3542    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3543    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3544    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3545    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3546}
3547
3548impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3549    type Error = crate::plan::PlanError;
3550    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3551        let mut extracted = CsrConfigOptionExtracted::default();
3552        let mut common_doc_comments = BTreeMap::new();
3553        for option in v {
3554            if !extracted.seen.insert(option.name.clone()) {
3555                return Err(PlanError::Unstructured({
3556                    format!("{} specified more than once", option.name)
3557                }));
3558            }
3559            let option_name = option.name.clone();
3560            let option_name_str = option_name.to_ast_string_simple();
3561            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3562                option_name: option_name.to_ast_string_simple(),
3563                err: e.into(),
3564            };
3565            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3566                val.map(|s| match s {
3567                    WithOptionValue::Value(Value::String(s)) => {
3568                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3569                    }
3570                    _ => Err("must be a string".to_string()),
3571                })
3572                .transpose()
3573                .map_err(PlanError::Unstructured)
3574                .map_err(better_error)
3575            };
3576            match option.name {
3577                CsrConfigOptionName::AvroKeyFullname => {
3578                    extracted.avro_key_fullname =
3579                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3580                }
3581                CsrConfigOptionName::AvroValueFullname => {
3582                    extracted.avro_value_fullname =
3583                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3584                }
3585                CsrConfigOptionName::NullDefaults => {
3586                    extracted.null_defaults =
3587                        <bool>::try_from_value(option.value).map_err(better_error)?;
3588                }
3589                CsrConfigOptionName::AvroDocOn(doc_on) => {
3590                    let value = String::try_from_value(option.value.ok_or_else(|| {
3591                        PlanError::InvalidOptionValue {
3592                            option_name: option_name_str,
3593                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3594                        }
3595                    })?)
3596                    .map_err(better_error)?;
3597                    let key = match doc_on.identifier {
3598                        DocOnIdentifier::Column(ast::ColumnName {
3599                            relation: ResolvedItemName::Item { id, .. },
3600                            column: ResolvedColumnReference::Column { name, index: _ },
3601                        }) => DocTarget::Field {
3602                            object_id: id,
3603                            column_name: name,
3604                        },
3605                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3606                            DocTarget::Type(id)
3607                        }
3608                        _ => sql_bail!("invalid DOC ON identifier"),
3609                    };
3610
3611                    match doc_on.for_schema {
3612                        DocOnSchema::KeyOnly => {
3613                            extracted.key_doc_options.insert(key, value);
3614                        }
3615                        DocOnSchema::ValueOnly => {
3616                            extracted.value_doc_options.insert(key, value);
3617                        }
3618                        DocOnSchema::All => {
3619                            common_doc_comments.insert(key, value);
3620                        }
3621                    }
3622                }
3623                CsrConfigOptionName::KeyCompatibilityLevel => {
3624                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3625                }
3626                CsrConfigOptionName::ValueCompatibilityLevel => {
3627                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3628                }
3629            }
3630        }
3631
3632        for (key, value) in common_doc_comments {
3633            if !extracted.key_doc_options.contains_key(&key) {
3634                extracted.key_doc_options.insert(key.clone(), value.clone());
3635            }
3636            if !extracted.value_doc_options.contains_key(&key) {
3637                extracted.value_doc_options.insert(key, value);
3638            }
3639        }
3640        Ok(extracted)
3641    }
3642}
3643
3644fn iceberg_sink_builder(
3645    scx: &StatementContext,
3646    catalog_connection: ResolvedItemName,
3647    aws_connection: ResolvedItemName,
3648    options: Vec<IcebergSinkConfigOption<Aug>>,
3649    relation_key_indices: Option<Vec<usize>>,
3650    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3651    commit_interval: Option<Duration>,
3652    desc: &RelationDesc,
3653) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3654    // Reject types that arrow-rs's parquet writer cannot handle, before
3655    // sink creation. Pass the iceberg overrides so types iceberg remaps
3656    // (e.g. interval -> string) don't trip the check.
3657    ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3658        .map_err(|e| sql_err!("{}", e))?;
3659    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3660    let catalog_connection_id = catalog_connection_item.id();
3661    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3662    let aws_connection_id = aws_connection_item.id();
3663    if !matches!(
3664        catalog_connection_item.connection()?,
3665        Connection::IcebergCatalog(_)
3666    ) {
3667        sql_bail!(
3668            "{} is not an iceberg catalog connection",
3669            scx.catalog
3670                .resolve_full_name(catalog_connection_item.name())
3671                .to_string()
3672                .quoted()
3673        );
3674    };
3675
3676    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3677        sql_bail!(
3678            "{} is not an AWS connection",
3679            scx.catalog
3680                .resolve_full_name(aws_connection_item.name())
3681                .to_string()
3682                .quoted()
3683        );
3684    }
3685
3686    let IcebergSinkConfigOptionExtracted {
3687        table,
3688        namespace,
3689        seen: _,
3690    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3691
3692    let Some(table) = table else {
3693        sql_bail!("Iceberg sink must specify TABLE");
3694    };
3695    let Some(namespace) = namespace else {
3696        sql_bail!("Iceberg sink must specify NAMESPACE");
3697    };
3698    if commit_interval.is_none() {
3699        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3700    }
3701
3702    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3703        catalog_connection_id,
3704        catalog_connection: catalog_connection_id,
3705        aws_connection_id,
3706        aws_connection: aws_connection_id,
3707        table,
3708        namespace,
3709        relation_key_indices,
3710        key_desc_and_indices,
3711    }))
3712}
3713
3714fn kafka_sink_builder(
3715    scx: &StatementContext,
3716    connection: ResolvedItemName,
3717    options: Vec<KafkaSinkConfigOption<Aug>>,
3718    format: Option<FormatSpecifier<Aug>>,
3719    relation_key_indices: Option<Vec<usize>>,
3720    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3721    headers_index: Option<usize>,
3722    value_desc: RelationDesc,
3723    envelope: SinkEnvelope,
3724    sink_from: CatalogItemId,
3725    commit_interval: Option<Duration>,
3726) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3727    // Get Kafka connection.
3728    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3729    let connection_id = connection_item.id();
3730    match connection_item.connection()? {
3731        Connection::Kafka(_) => (),
3732        _ => sql_bail!(
3733            "{} is not a kafka connection",
3734            scx.catalog.resolve_full_name(connection_item.name())
3735        ),
3736    };
3737
3738    if commit_interval.is_some() {
3739        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3740    }
3741
3742    let KafkaSinkConfigOptionExtracted {
3743        topic,
3744        compression_type,
3745        partition_by,
3746        progress_group_id_prefix,
3747        transactional_id_prefix,
3748        legacy_ids,
3749        topic_config,
3750        topic_metadata_refresh_interval,
3751        topic_partition_count,
3752        topic_replication_factor,
3753        seen: _,
3754    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3755
3756    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3757        (Some(_), Some(true)) => {
3758            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3759        }
3760        (None, Some(true)) => KafkaIdStyle::Legacy,
3761        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3762    };
3763
3764    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3765        (Some(_), Some(true)) => {
3766            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3767        }
3768        (None, Some(true)) => KafkaIdStyle::Legacy,
3769        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3770    };
3771
3772    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3773
3774    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3775        // This is a librdkafka-enforced restriction that, if violated,
3776        // would result in a runtime error for the source.
3777        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3778    }
3779
3780    let assert_positive = |val: Option<i32>, name: &str| {
3781        if let Some(val) = val {
3782            if val <= 0 {
3783                sql_bail!("{} must be a positive integer", name);
3784            }
3785        }
3786        val.map(NonNeg::try_from)
3787            .transpose()
3788            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3789    };
3790    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3791    let topic_replication_factor =
3792        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3793
3794    // Helper method to parse avro connection options for format specifiers that use avro
3795    // for either key or value encoding.
3796    let gen_avro_schema_options = |conn| {
3797        let CsrConnectionAvro {
3798            connection:
3799                CsrConnection {
3800                    connection,
3801                    options,
3802                },
3803            seed,
3804            key_strategy,
3805            value_strategy,
3806        } = conn;
3807        if seed.is_some() {
3808            sql_bail!("SEED option does not make sense with sinks");
3809        }
3810        if key_strategy.is_some() {
3811            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3812        }
3813        if value_strategy.is_some() {
3814            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3815        }
3816
3817        let item = scx.get_item_by_resolved_name(&connection)?;
3818        let csr_connection = match item.connection()? {
3819            Connection::Csr(_) => item.id(),
3820            _ => {
3821                sql_bail!(
3822                    "{} is not a schema registry connection",
3823                    scx.catalog
3824                        .resolve_full_name(item.name())
3825                        .to_string()
3826                        .quoted()
3827                )
3828            }
3829        };
3830        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3831
3832        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3833            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3834        }
3835
3836        if key_desc_and_indices.is_some()
3837            && (extracted_options.avro_key_fullname.is_some()
3838                ^ extracted_options.avro_value_fullname.is_some())
3839        {
3840            sql_bail!(
3841                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3842            );
3843        }
3844
3845        Ok((csr_connection, extracted_options))
3846    };
3847
3848    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3849        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3850        Format::Bytes if desc.arity() == 1 => {
3851            let col_type = &desc.typ().column_types[0].scalar_type;
3852            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3853                bail_unsupported!(format!(
3854                    "BYTES format with non-encodable type: {:?}",
3855                    col_type
3856                ));
3857            }
3858
3859            Ok(KafkaSinkFormatType::Bytes)
3860        }
3861        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3862        Format::Bytes | Format::Text => {
3863            bail_unsupported!("BYTES or TEXT format with multiple columns")
3864        }
3865        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3866        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3867            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3868            let schema = if is_key {
3869                AvroSchemaGenerator::new(
3870                    desc.clone(),
3871                    false,
3872                    options.key_doc_options,
3873                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3874                    options.null_defaults,
3875                    Some(sink_from),
3876                    false,
3877                )?
3878                .schema()
3879                .to_string()
3880            } else {
3881                AvroSchemaGenerator::new(
3882                    desc.clone(),
3883                    matches!(envelope, SinkEnvelope::Debezium),
3884                    options.value_doc_options,
3885                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3886                    options.null_defaults,
3887                    Some(sink_from),
3888                    true,
3889                )?
3890                .schema()
3891                .to_string()
3892            };
3893            Ok(KafkaSinkFormatType::Avro {
3894                schema,
3895                compatibility_level: if is_key {
3896                    options.key_compatibility_level
3897                } else {
3898                    options.value_compatibility_level
3899                },
3900                csr_connection,
3901            })
3902        }
3903        format => bail_unsupported!(format!("sink format {:?}", format)),
3904    };
3905
3906    let partition_by = match &partition_by {
3907        Some(partition_by) => {
3908            let mut scope = Scope::from_source(None, value_desc.iter_names());
3909
3910            match envelope {
3911                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3912                SinkEnvelope::Debezium => {
3913                    let key_indices: HashSet<_> = key_desc_and_indices
3914                        .as_ref()
3915                        .map(|(_desc, indices)| indices.as_slice())
3916                        .unwrap_or_default()
3917                        .into_iter()
3918                        .collect();
3919                    for (i, item) in scope.items.iter_mut().enumerate() {
3920                        if !key_indices.contains(&i) {
3921                            item.error_if_referenced = Some(|_table, column| {
3922                                PlanError::InvalidPartitionByEnvelopeDebezium {
3923                                    column_name: column.to_string(),
3924                                }
3925                            });
3926                        }
3927                    }
3928                }
3929            };
3930
3931            let ecx = &ExprContext {
3932                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3933                name: "PARTITION BY",
3934                scope: &scope,
3935                relation_type: value_desc.typ(),
3936                allow_aggregates: false,
3937                allow_subqueries: false,
3938                allow_parameters: false,
3939                allow_windows: false,
3940            };
3941            let expr = plan_expr(ecx, partition_by)?.cast_to(
3942                ecx,
3943                CastContext::Assignment,
3944                &SqlScalarType::UInt64,
3945            )?;
3946            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
3947
3948            Some(expr)
3949        }
3950        _ => None,
3951    };
3952
3953    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3954    let format = match format {
3955        Some(FormatSpecifier::KeyValue { key, value }) => {
3956            let key_format = match key_desc_and_indices.as_ref() {
3957                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3958                None => None,
3959            };
3960            KafkaSinkFormat {
3961                value_format: map_format(value, &value_desc, false)?,
3962                key_format,
3963            }
3964        }
3965        Some(FormatSpecifier::Bare(format)) => {
3966            let key_format = match key_desc_and_indices.as_ref() {
3967                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3968                None => None,
3969            };
3970            KafkaSinkFormat {
3971                value_format: map_format(format, &value_desc, false)?,
3972                key_format,
3973            }
3974        }
3975        None => bail_unsupported!("sink without format"),
3976    };
3977
3978    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3979        connection_id,
3980        connection: connection_id,
3981        format,
3982        topic: topic_name,
3983        relation_key_indices,
3984        key_desc_and_indices,
3985        headers_index,
3986        value_desc,
3987        partition_by,
3988        compression_type,
3989        progress_group_id,
3990        transactional_id,
3991        topic_options: KafkaTopicOptions {
3992            partition_count: topic_partition_count,
3993            replication_factor: topic_replication_factor,
3994            topic_config: topic_config.unwrap_or_default(),
3995        },
3996        topic_metadata_refresh_interval,
3997    }))
3998}
3999
4000pub fn describe_create_index(
4001    _: &StatementContext,
4002    _: CreateIndexStatement<Aug>,
4003) -> Result<StatementDesc, PlanError> {
4004    Ok(StatementDesc::new(None))
4005}
4006
4007pub fn plan_create_index(
4008    scx: &StatementContext,
4009    mut stmt: CreateIndexStatement<Aug>,
4010) -> Result<Plan, PlanError> {
4011    let CreateIndexStatement {
4012        name,
4013        on_name,
4014        in_cluster,
4015        key_parts,
4016        with_options,
4017        if_not_exists,
4018    } = &mut stmt;
4019    let on = scx.get_item_by_resolved_name(on_name)?;
4020
4021    {
4022        use CatalogItemType::*;
4023        match on.item_type() {
4024            Table | Source | View | MaterializedView => {
4025                if on.replacement_target().is_some() {
4026                    sql_bail!(
4027                        "index cannot be created on {} because it is a replacement {}",
4028                        on_name.full_name_str(),
4029                        on.item_type(),
4030                    );
4031                }
4032            }
4033            Sink | Index | Type | Func | Secret | Connection => {
4034                sql_bail!(
4035                    "index cannot be created on {} because it is a {}",
4036                    on_name.full_name_str(),
4037                    on.item_type(),
4038                );
4039            }
4040        }
4041    }
4042
4043    let on_desc = on
4044        .relation_desc()
4045        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4046
4047    let filled_key_parts = match key_parts {
4048        Some(kp) => kp.to_vec(),
4049        None => {
4050            // `key_parts` is None if we're creating a "default" index.
4051            // Precompute which column names are unambiguous in a single pass,
4052            // avoiding the O(n * k) cost of calling get_unambiguous_name per
4053            // key column.
4054            let mut name_counts = BTreeMap::new();
4055            for name in on_desc.iter_names() {
4056                *name_counts.entry(name).or_insert(0usize) += 1;
4057            }
4058            let key = on_desc.typ().default_key();
4059            key.iter()
4060                .map(|i| {
4061                    let name = on_desc.get_name(*i);
4062                    if name_counts.get(name).copied() == Some(1) {
4063                        Expr::Identifier(vec![name.clone().into()])
4064                    } else {
4065                        Expr::Value(Value::Number((i + 1).to_string()))
4066                    }
4067                })
4068                .collect()
4069        }
4070    };
4071    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4072
4073    let index_name = if let Some(name) = name {
4074        QualifiedItemName {
4075            qualifiers: on.name().qualifiers.clone(),
4076            item: normalize::ident(name.clone()),
4077        }
4078    } else {
4079        let mut idx_name = QualifiedItemName {
4080            qualifiers: on.name().qualifiers.clone(),
4081            item: on.name().item.clone(),
4082        };
4083        if key_parts.is_none() {
4084            // We're trying to create the "default" index.
4085            idx_name.item += "_primary_idx";
4086        } else {
4087            // Use PG schema for automatically naming indexes:
4088            // `<table>_<_-separated indexed expressions>_idx`
4089            let index_name_col_suffix = keys
4090                .iter()
4091                .map(|k| match k {
4092                    mz_expr::MirScalarExpr::Column(i, name) => {
4093                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4094                            (Some(col_name), _) => col_name.to_string(),
4095                            (None, Some(name)) => name.to_string(),
4096                            (None, None) => format!("{}", i + 1),
4097                        }
4098                    }
4099                    _ => "expr".to_string(),
4100                })
4101                .join("_");
4102            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4103                .expect("write on strings cannot fail");
4104            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4105        }
4106
4107        if !*if_not_exists {
4108            scx.catalog.find_available_name(idx_name)
4109        } else {
4110            idx_name
4111        }
4112    };
4113
4114    // Check for an object in the catalog with this same name
4115    let full_name = scx.catalog.resolve_full_name(&index_name);
4116    let partial_name = PartialItemName::from(full_name.clone());
4117    // For PostgreSQL compatibility, we need to prevent creating indexes when
4118    // there is an existing object *or* type of the same name.
4119    //
4120    // Technically, we only need to prevent coexistence of indexes and types
4121    // that have an associated relation (record types but not list/map types).
4122    // Enforcing that would be more complicated, though. It's backwards
4123    // compatible to weaken this restriction in the future.
4124    if let (Ok(item), false, false) = (
4125        scx.catalog.resolve_item_or_type(&partial_name),
4126        *if_not_exists,
4127        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4128    ) {
4129        return Err(PlanError::ItemAlreadyExists {
4130            name: full_name.to_string(),
4131            item_type: item.item_type(),
4132        });
4133    }
4134
4135    let options = plan_index_options(scx, with_options.clone())?;
4136    let cluster_id = match in_cluster {
4137        None => scx.resolve_cluster(None)?.id(),
4138        Some(in_cluster) => in_cluster.id,
4139    };
4140
4141    *in_cluster = Some(ResolvedClusterName {
4142        id: cluster_id,
4143        print_name: None,
4144    });
4145
4146    // Normalize `stmt`.
4147    *name = Some(Ident::new(index_name.item.clone())?);
4148    *key_parts = Some(filled_key_parts);
4149    let if_not_exists = *if_not_exists;
4150
4151    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4152    let compaction_window = options.iter().find_map(|o| {
4153        #[allow(irrefutable_let_patterns)]
4154        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4155            Some(lcw.clone())
4156        } else {
4157            None
4158        }
4159    });
4160
4161    Ok(Plan::CreateIndex(CreateIndexPlan {
4162        name: index_name,
4163        index: Index {
4164            create_sql,
4165            on: on.global_id(),
4166            keys,
4167            cluster_id,
4168            compaction_window,
4169        },
4170        if_not_exists,
4171    }))
4172}
4173
4174pub fn describe_create_type(
4175    _: &StatementContext,
4176    _: CreateTypeStatement<Aug>,
4177) -> Result<StatementDesc, PlanError> {
4178    Ok(StatementDesc::new(None))
4179}
4180
4181pub fn plan_create_type(
4182    scx: &StatementContext,
4183    stmt: CreateTypeStatement<Aug>,
4184) -> Result<Plan, PlanError> {
4185    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4186    let CreateTypeStatement { name, as_type, .. } = stmt;
4187
4188    fn validate_data_type(
4189        scx: &StatementContext,
4190        data_type: ResolvedDataType,
4191        as_type: &str,
4192        key: &str,
4193    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4194        let (id, modifiers) = match data_type {
4195            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4196            _ => sql_bail!(
4197                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4198                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4199                as_type,
4200                key,
4201                data_type.human_readable_name(),
4202            ),
4203        };
4204
4205        let item = scx.catalog.get_item(&id);
4206        match item.type_details() {
4207            None => sql_bail!(
4208                "{} must be of class type, but received {} which is of class {}",
4209                key,
4210                scx.catalog.resolve_full_name(item.name()),
4211                item.item_type()
4212            ),
4213            Some(CatalogTypeDetails {
4214                typ: CatalogType::Char,
4215                ..
4216            }) => {
4217                bail_unsupported!("embedding char type in a list or map")
4218            }
4219            _ => {
4220                // Validate that the modifiers are actually valid.
4221                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4222
4223                Ok((id, modifiers))
4224            }
4225        }
4226    }
4227
4228    let inner = match as_type {
4229        CreateTypeAs::List { options } => {
4230            let CreateTypeListOptionExtracted {
4231                element_type,
4232                seen: _,
4233            } = CreateTypeListOptionExtracted::try_from(options)?;
4234            let element_type =
4235                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4236            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4237            CatalogType::List {
4238                element_reference: id,
4239                element_modifiers: modifiers,
4240            }
4241        }
4242        CreateTypeAs::Map { options } => {
4243            let CreateTypeMapOptionExtracted {
4244                key_type,
4245                value_type,
4246                seen: _,
4247            } = CreateTypeMapOptionExtracted::try_from(options)?;
4248            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4249            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4250            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4251            let (value_id, value_modifiers) =
4252                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4253            CatalogType::Map {
4254                key_reference: key_id,
4255                key_modifiers,
4256                value_reference: value_id,
4257                value_modifiers,
4258            }
4259        }
4260        CreateTypeAs::Record { column_defs } => {
4261            let mut fields = vec![];
4262            for column_def in column_defs {
4263                let data_type = column_def.data_type;
4264                let key = ident(column_def.name.clone());
4265                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4266                fields.push(CatalogRecordField {
4267                    name: ColumnName::from(key.clone()),
4268                    type_reference: id,
4269                    type_modifiers: modifiers,
4270                });
4271            }
4272            CatalogType::Record { fields }
4273        }
4274    };
4275
4276    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4277
4278    // Check for an object in the catalog with this same name
4279    let full_name = scx.catalog.resolve_full_name(&name);
4280    let partial_name = PartialItemName::from(full_name.clone());
4281    // For PostgreSQL compatibility, we need to prevent creating types when
4282    // there is an existing object *or* type of the same name.
4283    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4284        if item.item_type().conflicts_with_type() {
4285            return Err(PlanError::ItemAlreadyExists {
4286                name: full_name.to_string(),
4287                item_type: item.item_type(),
4288            });
4289        }
4290    }
4291
4292    Ok(Plan::CreateType(CreateTypePlan {
4293        name,
4294        typ: Type { create_sql, inner },
4295    }))
4296}
4297
4298generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4299
4300generate_extracted_config!(
4301    CreateTypeMapOption,
4302    (KeyType, ResolvedDataType),
4303    (ValueType, ResolvedDataType)
4304);
4305
4306#[derive(Debug)]
4307pub enum PlannedAlterRoleOption {
4308    Attributes(PlannedRoleAttributes),
4309    Variable(PlannedRoleVariable),
4310}
4311
4312#[derive(Debug, Clone)]
4313pub struct PlannedRoleAttributes {
4314    pub inherit: Option<bool>,
4315    pub password: Option<Password>,
4316    pub scram_iterations: Option<NonZeroU32>,
4317    /// `nopassword` is set to true if the password is from the parser is None.
4318    /// This is semantically different than not supplying a password at all,
4319    /// to allow for unsetting a password.
4320    pub nopassword: Option<bool>,
4321    pub superuser: Option<bool>,
4322    pub login: Option<bool>,
4323}
4324
4325fn plan_role_attributes(
4326    options: Vec<RoleAttribute>,
4327    scx: &StatementContext,
4328) -> Result<PlannedRoleAttributes, PlanError> {
4329    let mut planned_attributes = PlannedRoleAttributes {
4330        inherit: None,
4331        password: None,
4332        scram_iterations: None,
4333        superuser: None,
4334        login: None,
4335        nopassword: None,
4336    };
4337
4338    for option in options {
4339        match option {
4340            RoleAttribute::Inherit | RoleAttribute::NoInherit
4341                if planned_attributes.inherit.is_some() =>
4342            {
4343                sql_bail!("conflicting or redundant options");
4344            }
4345            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4346                bail_never_supported!(
4347                    "CREATECLUSTER attribute",
4348                    "sql/create-role/#details",
4349                    "Use system privileges instead."
4350                );
4351            }
4352            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4353                bail_never_supported!(
4354                    "CREATEDB attribute",
4355                    "sql/create-role/#details",
4356                    "Use system privileges instead."
4357                );
4358            }
4359            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4360                bail_never_supported!(
4361                    "CREATEROLE attribute",
4362                    "sql/create-role/#details",
4363                    "Use system privileges instead."
4364                );
4365            }
4366            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4367                sql_bail!("conflicting or redundant options");
4368            }
4369
4370            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4371            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4372            RoleAttribute::Password(password) => {
4373                if let Some(password) = password {
4374                    planned_attributes.password = Some(password.into());
4375                    planned_attributes.scram_iterations =
4376                        Some(scx.catalog.system_vars().scram_iterations())
4377                } else {
4378                    planned_attributes.nopassword = Some(true);
4379                }
4380            }
4381            RoleAttribute::SuperUser => {
4382                if planned_attributes.superuser == Some(false) {
4383                    sql_bail!("conflicting or redundant options");
4384                }
4385                planned_attributes.superuser = Some(true);
4386            }
4387            RoleAttribute::NoSuperUser => {
4388                if planned_attributes.superuser == Some(true) {
4389                    sql_bail!("conflicting or redundant options");
4390                }
4391                planned_attributes.superuser = Some(false);
4392            }
4393            RoleAttribute::Login => {
4394                if planned_attributes.login == Some(false) {
4395                    sql_bail!("conflicting or redundant options");
4396                }
4397                planned_attributes.login = Some(true);
4398            }
4399            RoleAttribute::NoLogin => {
4400                if planned_attributes.login == Some(true) {
4401                    sql_bail!("conflicting or redundant options");
4402                }
4403                planned_attributes.login = Some(false);
4404            }
4405        }
4406    }
4407    if planned_attributes.inherit == Some(false) {
4408        bail_unsupported!("non inherit roles");
4409    }
4410
4411    Ok(planned_attributes)
4412}
4413
4414#[derive(Debug)]
4415pub enum PlannedRoleVariable {
4416    Set { name: String, value: VariableValue },
4417    Reset { name: String },
4418}
4419
4420impl PlannedRoleVariable {
4421    pub fn name(&self) -> &str {
4422        match self {
4423            PlannedRoleVariable::Set { name, .. } => name,
4424            PlannedRoleVariable::Reset { name } => name,
4425        }
4426    }
4427}
4428
4429fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4430    let plan = match variable {
4431        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4432            name: name.to_string(),
4433            value: scl::plan_set_variable_to(value)?,
4434        },
4435        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4436            name: name.to_string(),
4437        },
4438    };
4439    Ok(plan)
4440}
4441
4442pub fn describe_create_role(
4443    _: &StatementContext,
4444    _: CreateRoleStatement,
4445) -> Result<StatementDesc, PlanError> {
4446    Ok(StatementDesc::new(None))
4447}
4448
4449pub fn plan_create_role(
4450    scx: &StatementContext,
4451    CreateRoleStatement { name, options }: CreateRoleStatement,
4452) -> Result<Plan, PlanError> {
4453    let attributes = plan_role_attributes(options, scx)?;
4454    Ok(Plan::CreateRole(CreateRolePlan {
4455        name: normalize::ident(name),
4456        attributes: attributes.into(),
4457    }))
4458}
4459
4460pub fn plan_create_network_policy(
4461    ctx: &StatementContext,
4462    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4463) -> Result<Plan, PlanError> {
4464    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4465    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4466
4467    let Some(rule_defs) = policy_options.rules else {
4468        sql_bail!("RULES must be specified when creating network policies.");
4469    };
4470
4471    let mut rules = vec![];
4472    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4473        let NetworkPolicyRuleOptionExtracted {
4474            seen: _,
4475            direction,
4476            action,
4477            address,
4478        } = options.try_into()?;
4479        let (direction, action, address) = match (direction, action, address) {
4480            (Some(direction), Some(action), Some(address)) => (
4481                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4482                NetworkPolicyRuleAction::try_from(action.as_str())?,
4483                PolicyAddress::try_from(address.as_str())?,
4484            ),
4485            (_, _, _) => {
4486                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4487            }
4488        };
4489        rules.push(NetworkPolicyRule {
4490            name: normalize::ident(name),
4491            direction,
4492            action,
4493            address,
4494        });
4495    }
4496
4497    if rules.len()
4498        > ctx
4499            .catalog
4500            .system_vars()
4501            .max_rules_per_network_policy()
4502            .try_into()?
4503    {
4504        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4505    }
4506
4507    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4508        name: normalize::ident(name),
4509        rules,
4510    }))
4511}
4512
4513pub fn plan_alter_network_policy(
4514    ctx: &StatementContext,
4515    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4516) -> Result<Plan, PlanError> {
4517    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4518
4519    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4520    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4521
4522    let Some(rule_defs) = policy_options.rules else {
4523        sql_bail!("RULES must be specified when creating network policies.");
4524    };
4525
4526    let mut rules = vec![];
4527    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4528        let NetworkPolicyRuleOptionExtracted {
4529            seen: _,
4530            direction,
4531            action,
4532            address,
4533        } = options.try_into()?;
4534
4535        let (direction, action, address) = match (direction, action, address) {
4536            (Some(direction), Some(action), Some(address)) => (
4537                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4538                NetworkPolicyRuleAction::try_from(action.as_str())?,
4539                PolicyAddress::try_from(address.as_str())?,
4540            ),
4541            (_, _, _) => {
4542                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4543            }
4544        };
4545        rules.push(NetworkPolicyRule {
4546            name: normalize::ident(name),
4547            direction,
4548            action,
4549            address,
4550        });
4551    }
4552    if rules.len()
4553        > ctx
4554            .catalog
4555            .system_vars()
4556            .max_rules_per_network_policy()
4557            .try_into()?
4558    {
4559        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4560    }
4561
4562    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4563        id: policy.id(),
4564        name: normalize::ident(name),
4565        rules,
4566    }))
4567}
4568
4569pub fn describe_create_cluster(
4570    _: &StatementContext,
4571    _: CreateClusterStatement<Aug>,
4572) -> Result<StatementDesc, PlanError> {
4573    Ok(StatementDesc::new(None))
4574}
4575
4576// WARNING:
4577// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4578// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4579// that option stays the same. If you were to give a default value here, then not giving that option
4580// to ALTER CLUSTER would always reset the value of that option to the default.
4581generate_extracted_config!(
4582    ClusterOption,
4583    (AvailabilityZones, Vec<String>),
4584    (Disk, bool),
4585    (IntrospectionDebugging, bool),
4586    (IntrospectionInterval, OptionalDuration),
4587    (Managed, bool),
4588    (Replicas, Vec<ReplicaDefinition<Aug>>),
4589    (ReplicationFactor, u32),
4590    (Size, String),
4591    (Schedule, ClusterScheduleOptionValue),
4592    (WorkloadClass, OptionalString)
4593);
4594
4595generate_extracted_config!(
4596    NetworkPolicyOption,
4597    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4598);
4599
4600generate_extracted_config!(
4601    NetworkPolicyRuleOption,
4602    (Direction, String),
4603    (Action, String),
4604    (Address, String)
4605);
4606
4607generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4608
4609generate_extracted_config!(
4610    ClusterAlterUntilReadyOption,
4611    (Timeout, Duration),
4612    (OnTimeout, String)
4613);
4614
4615generate_extracted_config!(
4616    ClusterFeature,
4617    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4618    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4619    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4620    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4621    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4622    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4623    (
4624        EnableProjectionPushdownAfterRelationCse,
4625        Option<bool>,
4626        Default(None)
4627    )
4628);
4629
4630/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4631///
4632/// The reverse of [`unplan_create_cluster`].
4633pub fn plan_create_cluster(
4634    scx: &StatementContext,
4635    stmt: CreateClusterStatement<Aug>,
4636) -> Result<Plan, PlanError> {
4637    let plan = plan_create_cluster_inner(scx, stmt)?;
4638
4639    // Roundtrip through unplan and make sure that we end up with the same plan.
4640    if let CreateClusterVariant::Managed(_) = &plan.variant {
4641        let stmt = unplan_create_cluster(scx, plan.clone())
4642            .map_err(|e| PlanError::Replan(e.to_string()))?;
4643        let create_sql = stmt.to_ast_string_stable();
4644        let stmt = parse::parse(&create_sql)
4645            .map_err(|e| PlanError::Replan(e.to_string()))?
4646            .into_element()
4647            .ast;
4648        let (stmt, _resolved_ids) =
4649            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4650        let stmt = match stmt {
4651            Statement::CreateCluster(stmt) => stmt,
4652            stmt => {
4653                return Err(PlanError::Replan(format!(
4654                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4655                )));
4656            }
4657        };
4658        let replan =
4659            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4660        if plan != replan {
4661            return Err(PlanError::Replan(format!(
4662                "replan does not match: plan={plan:?}, replan={replan:?}"
4663            )));
4664        }
4665    }
4666
4667    Ok(Plan::CreateCluster(plan))
4668}
4669
4670pub fn plan_create_cluster_inner(
4671    scx: &StatementContext,
4672    CreateClusterStatement {
4673        name,
4674        options,
4675        features,
4676    }: CreateClusterStatement<Aug>,
4677) -> Result<CreateClusterPlan, PlanError> {
4678    let ClusterOptionExtracted {
4679        availability_zones,
4680        introspection_debugging,
4681        introspection_interval,
4682        managed,
4683        replicas,
4684        replication_factor,
4685        seen: _,
4686        size,
4687        disk,
4688        schedule,
4689        workload_class,
4690    }: ClusterOptionExtracted = options.try_into()?;
4691
4692    let managed = managed.unwrap_or_else(|| replicas.is_none());
4693
4694    if !scx.catalog.active_role_id().is_system() {
4695        if !features.is_empty() {
4696            sql_bail!("FEATURES not supported for non-system users");
4697        }
4698        if workload_class.is_some() {
4699            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4700        }
4701    }
4702
4703    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4704    let workload_class = workload_class.and_then(|v| v.0);
4705
4706    if managed {
4707        if replicas.is_some() {
4708            sql_bail!("REPLICAS not supported for managed clusters");
4709        }
4710        let Some(size) = size else {
4711            sql_bail!("SIZE must be specified for managed clusters");
4712        };
4713
4714        if disk.is_some() {
4715            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4716            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4717            // we'll be able to remove the `DISK` option entirely.
4718            if scx.catalog.is_cluster_size_cc(&size) {
4719                sql_bail!(
4720                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4721                );
4722            }
4723
4724            scx.catalog
4725                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4726        }
4727
4728        let compute = plan_compute_replica_config(
4729            introspection_interval,
4730            introspection_debugging.unwrap_or(false),
4731        )?;
4732
4733        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4734            replication_factor.unwrap_or_else(|| {
4735                scx.catalog
4736                    .system_vars()
4737                    .default_cluster_replication_factor()
4738            })
4739        } else {
4740            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4741            if replication_factor.is_some() {
4742                sql_bail!(
4743                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4744                );
4745            }
4746            // If we have a non-trivial schedule, then let's not have any replicas initially,
4747            // to avoid quickly going back and forth if the schedule doesn't want a replica
4748            // initially.
4749            0
4750        };
4751        let availability_zones = availability_zones.unwrap_or_default();
4752
4753        if !availability_zones.is_empty() {
4754            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4755        }
4756
4757        // Plan OptimizerFeatureOverrides.
4758        let ClusterFeatureExtracted {
4759            reoptimize_imported_views,
4760            enable_eager_delta_joins,
4761            enable_new_outer_join_lowering,
4762            enable_variadic_left_join_lowering,
4763            enable_letrec_fixpoint_analysis,
4764            enable_join_prioritize_arranged,
4765            enable_projection_pushdown_after_relation_cse,
4766            seen: _,
4767        } = ClusterFeatureExtracted::try_from(features)?;
4768        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4769            reoptimize_imported_views,
4770            enable_eager_delta_joins,
4771            enable_new_outer_join_lowering,
4772            enable_variadic_left_join_lowering,
4773            enable_letrec_fixpoint_analysis,
4774            enable_join_prioritize_arranged,
4775            enable_projection_pushdown_after_relation_cse,
4776            ..Default::default()
4777        };
4778
4779        let schedule = plan_cluster_schedule(schedule)?;
4780
4781        Ok(CreateClusterPlan {
4782            name: normalize::ident(name),
4783            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4784                replication_factor,
4785                size,
4786                availability_zones,
4787                compute,
4788                optimizer_feature_overrides,
4789                schedule,
4790            }),
4791            workload_class,
4792        })
4793    } else {
4794        let Some(replica_defs) = replicas else {
4795            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4796        };
4797        if availability_zones.is_some() {
4798            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4799        }
4800        if replication_factor.is_some() {
4801            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4802        }
4803        if introspection_debugging.is_some() {
4804            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4805        }
4806        if introspection_interval.is_some() {
4807            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4808        }
4809        if size.is_some() {
4810            sql_bail!("SIZE not supported for unmanaged clusters");
4811        }
4812        if disk.is_some() {
4813            sql_bail!("DISK not supported for unmanaged clusters");
4814        }
4815        if !features.is_empty() {
4816            sql_bail!("FEATURES not supported for unmanaged clusters");
4817        }
4818        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4819            sql_bail!(
4820                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4821            );
4822        }
4823
4824        let mut replicas = vec![];
4825        for ReplicaDefinition { name, options } in replica_defs {
4826            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4827        }
4828
4829        Ok(CreateClusterPlan {
4830            name: normalize::ident(name),
4831            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4832            workload_class,
4833        })
4834    }
4835}
4836
4837/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4838///
4839/// The reverse of [`plan_create_cluster`].
4840pub fn unplan_create_cluster(
4841    scx: &StatementContext,
4842    CreateClusterPlan {
4843        name,
4844        variant,
4845        workload_class,
4846    }: CreateClusterPlan,
4847) -> Result<CreateClusterStatement<Aug>, PlanError> {
4848    match variant {
4849        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4850            replication_factor,
4851            size,
4852            availability_zones,
4853            compute,
4854            optimizer_feature_overrides,
4855            schedule,
4856        }) => {
4857            let schedule = unplan_cluster_schedule(schedule);
4858            let OptimizerFeatureOverrides {
4859                enable_consolidate_after_union_negate: _,
4860                enable_reduce_mfp_fusion: _,
4861                enable_cardinality_estimates: _,
4862                persist_fast_path_limit: _,
4863                reoptimize_imported_views,
4864                enable_eager_delta_joins,
4865                enable_new_outer_join_lowering,
4866                enable_variadic_left_join_lowering,
4867                enable_letrec_fixpoint_analysis,
4868                enable_join_prioritize_arranged,
4869                enable_projection_pushdown_after_relation_cse,
4870                enable_less_reduce_in_eqprop: _,
4871                enable_dequadratic_eqprop_map: _,
4872                enable_eq_classes_withholding_errors: _,
4873                enable_fast_path_plan_insights: _,
4874                enable_cast_elimination: _,
4875                enable_case_literal_transform: _,
4876                enable_simplify_quantified_comparisons: _,
4877                enable_coalesce_case_transform: _,
4878            } = optimizer_feature_overrides;
4879            // The ones from above that don't occur below are not wired up to cluster features.
4880            let features_extracted = ClusterFeatureExtracted {
4881                // Seen is ignored when unplanning.
4882                seen: Default::default(),
4883                reoptimize_imported_views,
4884                enable_eager_delta_joins,
4885                enable_new_outer_join_lowering,
4886                enable_variadic_left_join_lowering,
4887                enable_letrec_fixpoint_analysis,
4888                enable_join_prioritize_arranged,
4889                enable_projection_pushdown_after_relation_cse,
4890            };
4891            let features = features_extracted.into_values(scx.catalog);
4892            let availability_zones = if availability_zones.is_empty() {
4893                None
4894            } else {
4895                Some(availability_zones)
4896            };
4897            let (introspection_interval, introspection_debugging) =
4898                unplan_compute_replica_config(compute);
4899            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4900            // always 1 or less.
4901            let replication_factor = match &schedule {
4902                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4903                ClusterScheduleOptionValue::Refresh { .. } => {
4904                    assert!(
4905                        replication_factor <= 1,
4906                        "replication factor, {replication_factor:?}, must be <= 1"
4907                    );
4908                    None
4909                }
4910            };
4911            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4912            let options_extracted = ClusterOptionExtracted {
4913                // Seen is ignored when unplanning.
4914                seen: Default::default(),
4915                availability_zones,
4916                disk: None,
4917                introspection_debugging: Some(introspection_debugging),
4918                introspection_interval,
4919                managed: Some(true),
4920                replicas: None,
4921                replication_factor,
4922                size: Some(size),
4923                schedule: Some(schedule),
4924                workload_class,
4925            };
4926            let options = options_extracted.into_values(scx.catalog);
4927            let name = Ident::new_unchecked(name);
4928            Ok(CreateClusterStatement {
4929                name,
4930                options,
4931                features,
4932            })
4933        }
4934        CreateClusterVariant::Unmanaged(_) => {
4935            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4936        }
4937    }
4938}
4939
4940generate_extracted_config!(
4941    ReplicaOption,
4942    (AvailabilityZone, String),
4943    (BilledAs, String),
4944    (ComputeAddresses, Vec<String>),
4945    (ComputectlAddresses, Vec<String>),
4946    (Disk, bool),
4947    (Internal, bool, Default(false)),
4948    (IntrospectionDebugging, bool, Default(false)),
4949    (IntrospectionInterval, OptionalDuration),
4950    (Size, String),
4951    (StorageAddresses, Vec<String>),
4952    (StoragectlAddresses, Vec<String>),
4953    (Workers, u16)
4954);
4955
4956fn plan_replica_config(
4957    scx: &StatementContext,
4958    options: Vec<ReplicaOption<Aug>>,
4959) -> Result<ReplicaConfig, PlanError> {
4960    let ReplicaOptionExtracted {
4961        availability_zone,
4962        billed_as,
4963        computectl_addresses,
4964        disk,
4965        internal,
4966        introspection_debugging,
4967        introspection_interval,
4968        size,
4969        storagectl_addresses,
4970        ..
4971    }: ReplicaOptionExtracted = options.try_into()?;
4972
4973    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4974
4975    match (
4976        size,
4977        availability_zone,
4978        billed_as,
4979        storagectl_addresses,
4980        computectl_addresses,
4981    ) {
4982        // Common cases we expect end users to hit.
4983        (None, _, None, None, None) => {
4984            // We don't mention the unmanaged options in the error message
4985            // because they are only available in unsafe mode.
4986            sql_bail!("SIZE option must be specified");
4987        }
4988        (Some(size), availability_zone, billed_as, None, None) => {
4989            if disk.is_some() {
4990                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4991                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4992                // we'll be able to remove the `DISK` option entirely.
4993                if scx.catalog.is_cluster_size_cc(&size) {
4994                    sql_bail!(
4995                        "DISK option not supported for modern cluster sizes because disk is always enabled"
4996                    );
4997                }
4998
4999                scx.catalog
5000                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5001            }
5002
5003            Ok(ReplicaConfig::Orchestrated {
5004                size,
5005                availability_zone,
5006                compute,
5007                billed_as,
5008                internal,
5009            })
5010        }
5011
5012        (None, None, None, storagectl_addresses, computectl_addresses) => {
5013            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5014
5015            // When manually testing Materialize in unsafe mode, it's easy to
5016            // accidentally omit one of these options, so we try to produce
5017            // helpful error messages.
5018            let Some(storagectl_addrs) = storagectl_addresses else {
5019                sql_bail!("missing STORAGECTL ADDRESSES option");
5020            };
5021            let Some(computectl_addrs) = computectl_addresses else {
5022                sql_bail!("missing COMPUTECTL ADDRESSES option");
5023            };
5024
5025            if storagectl_addrs.len() != computectl_addrs.len() {
5026                sql_bail!(
5027                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5028                );
5029            }
5030
5031            if disk.is_some() {
5032                sql_bail!("DISK can't be specified for unorchestrated clusters");
5033            }
5034
5035            Ok(ReplicaConfig::Unorchestrated {
5036                storagectl_addrs,
5037                computectl_addrs,
5038                compute,
5039            })
5040        }
5041        _ => {
5042            // We don't bother trying to produce a more helpful error message
5043            // here because no user is likely to hit this path.
5044            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5045        }
5046    }
5047}
5048
5049/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5050///
5051/// The reverse of [`unplan_compute_replica_config`].
5052fn plan_compute_replica_config(
5053    introspection_interval: Option<OptionalDuration>,
5054    introspection_debugging: bool,
5055) -> Result<ComputeReplicaConfig, PlanError> {
5056    let introspection_interval = introspection_interval
5057        .map(|OptionalDuration(i)| i)
5058        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5059    let introspection = match introspection_interval {
5060        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5061            interval,
5062            debugging: introspection_debugging,
5063        }),
5064        None if introspection_debugging => {
5065            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5066        }
5067        None => None,
5068    };
5069    let compute = ComputeReplicaConfig { introspection };
5070    Ok(compute)
5071}
5072
5073/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5074///
5075/// The reverse of [`plan_compute_replica_config`].
5076fn unplan_compute_replica_config(
5077    compute_replica_config: ComputeReplicaConfig,
5078) -> (Option<OptionalDuration>, bool) {
5079    match compute_replica_config.introspection {
5080        Some(ComputeReplicaIntrospectionConfig {
5081            debugging,
5082            interval,
5083        }) => (Some(OptionalDuration(Some(interval))), debugging),
5084        None => (Some(OptionalDuration(None)), false),
5085    }
5086}
5087
5088/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5089///
5090/// The reverse of [`unplan_cluster_schedule`].
5091fn plan_cluster_schedule(
5092    schedule: ClusterScheduleOptionValue,
5093) -> Result<ClusterSchedule, PlanError> {
5094    Ok(match schedule {
5095        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5096        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5097        ClusterScheduleOptionValue::Refresh {
5098            hydration_time_estimate: None,
5099        } => ClusterSchedule::Refresh {
5100            hydration_time_estimate: Duration::from_millis(0),
5101        },
5102        // Otherwise we convert the `IntervalValue` to a `Duration`.
5103        ClusterScheduleOptionValue::Refresh {
5104            hydration_time_estimate: Some(interval_value),
5105        } => {
5106            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5107            if interval.as_microseconds() < 0 {
5108                sql_bail!(
5109                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5110                    interval
5111                );
5112            }
5113            if interval.months != 0 {
5114                // This limitation is because we want this interval to be cleanly convertable
5115                // to a unix epoch timestamp difference. When the interval involves months, then
5116                // this is not true anymore, because months have variable lengths.
5117                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5118            }
5119            let duration = interval.duration()?;
5120            if u64::try_from(duration.as_millis()).is_err()
5121                || Interval::from_duration(&duration).is_err()
5122            {
5123                sql_bail!("HYDRATION TIME ESTIMATE too large");
5124            }
5125            ClusterSchedule::Refresh {
5126                hydration_time_estimate: duration,
5127            }
5128        }
5129    })
5130}
5131
5132/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5133///
5134/// The reverse of [`plan_cluster_schedule`].
5135fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5136    match schedule {
5137        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5138        ClusterSchedule::Refresh {
5139            hydration_time_estimate,
5140        } => {
5141            let interval = Interval::from_duration(&hydration_time_estimate)
5142                .expect("planning ensured that this is convertible back to Interval");
5143            let interval_value = literal::unplan_interval(&interval);
5144            ClusterScheduleOptionValue::Refresh {
5145                hydration_time_estimate: Some(interval_value),
5146            }
5147        }
5148    }
5149}
5150
5151pub fn describe_create_cluster_replica(
5152    _: &StatementContext,
5153    _: CreateClusterReplicaStatement<Aug>,
5154) -> Result<StatementDesc, PlanError> {
5155    Ok(StatementDesc::new(None))
5156}
5157
5158pub fn plan_create_cluster_replica(
5159    scx: &StatementContext,
5160    CreateClusterReplicaStatement {
5161        definition: ReplicaDefinition { name, options },
5162        of_cluster,
5163    }: CreateClusterReplicaStatement<Aug>,
5164) -> Result<Plan, PlanError> {
5165    let cluster = scx
5166        .catalog
5167        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5168
5169    let config = plan_replica_config(scx, options)?;
5170
5171    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5172        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5173            return Err(PlanError::MangedReplicaName(name.into_string()));
5174        }
5175    } else {
5176        ensure_cluster_is_not_managed(scx, cluster.id())?;
5177    }
5178
5179    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5180        name: normalize::ident(name),
5181        cluster_id: cluster.id(),
5182        config,
5183    }))
5184}
5185
5186pub fn describe_create_secret(
5187    _: &StatementContext,
5188    _: CreateSecretStatement<Aug>,
5189) -> Result<StatementDesc, PlanError> {
5190    Ok(StatementDesc::new(None))
5191}
5192
5193pub fn plan_create_secret(
5194    scx: &StatementContext,
5195    stmt: CreateSecretStatement<Aug>,
5196) -> Result<Plan, PlanError> {
5197    let CreateSecretStatement {
5198        name,
5199        if_not_exists,
5200        value,
5201    } = &stmt;
5202
5203    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5204    let mut create_sql_statement = stmt.clone();
5205    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5206    let create_sql =
5207        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5208    let secret_as = query::plan_secret_as(scx, value.clone())?;
5209
5210    let secret = Secret {
5211        create_sql,
5212        secret_as,
5213    };
5214
5215    Ok(Plan::CreateSecret(CreateSecretPlan {
5216        name,
5217        secret,
5218        if_not_exists: *if_not_exists,
5219    }))
5220}
5221
5222pub fn describe_create_connection(
5223    _: &StatementContext,
5224    _: CreateConnectionStatement<Aug>,
5225) -> Result<StatementDesc, PlanError> {
5226    Ok(StatementDesc::new(None))
5227}
5228
5229generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5230
5231pub fn plan_create_connection(
5232    scx: &StatementContext,
5233    mut stmt: CreateConnectionStatement<Aug>,
5234) -> Result<Plan, PlanError> {
5235    let CreateConnectionStatement {
5236        name,
5237        connection_type,
5238        values,
5239        if_not_exists,
5240        with_options,
5241    } = stmt.clone();
5242    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5243    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5244    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5245
5246    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5247    if options.validate.is_some() {
5248        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5249    }
5250    let validate = match options.validate {
5251        Some(val) => val,
5252        None => {
5253            scx.catalog
5254                .system_vars()
5255                .enable_default_connection_validation()
5256                && details.to_connection().validate_by_default()
5257        }
5258    };
5259
5260    // Check for an object in the catalog with this same name
5261    let full_name = scx.catalog.resolve_full_name(&name);
5262    let partial_name = PartialItemName::from(full_name.clone());
5263    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5264        return Err(PlanError::ItemAlreadyExists {
5265            name: full_name.to_string(),
5266            item_type: item.item_type(),
5267        });
5268    }
5269
5270    // For SSH connections, overwrite the public key options based on the
5271    // connection details, in case we generated new keys during planning.
5272    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5273        stmt.values.retain(|v| {
5274            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5275        });
5276        stmt.values.push(ConnectionOption {
5277            name: ConnectionOptionName::PublicKey1,
5278            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5279        });
5280        stmt.values.push(ConnectionOption {
5281            name: ConnectionOptionName::PublicKey2,
5282            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5283        });
5284    }
5285    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5286
5287    let plan = CreateConnectionPlan {
5288        name,
5289        if_not_exists,
5290        connection: crate::plan::Connection {
5291            create_sql,
5292            details,
5293        },
5294        validate,
5295    };
5296    Ok(Plan::CreateConnection(plan))
5297}
5298
5299fn plan_drop_database(
5300    scx: &StatementContext,
5301    if_exists: bool,
5302    name: &UnresolvedDatabaseName,
5303    cascade: bool,
5304) -> Result<Option<DatabaseId>, PlanError> {
5305    Ok(match resolve_database(scx, name, if_exists)? {
5306        Some(database) => {
5307            if !cascade && database.has_schemas() {
5308                sql_bail!(
5309                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5310                    name,
5311                );
5312            }
5313            Some(database.id())
5314        }
5315        None => None,
5316    })
5317}
5318
5319pub fn describe_drop_objects(
5320    _: &StatementContext,
5321    _: DropObjectsStatement,
5322) -> Result<StatementDesc, PlanError> {
5323    Ok(StatementDesc::new(None))
5324}
5325
5326pub fn plan_drop_objects(
5327    scx: &mut StatementContext,
5328    DropObjectsStatement {
5329        object_type,
5330        if_exists,
5331        names,
5332        cascade,
5333    }: DropObjectsStatement,
5334) -> Result<Plan, PlanError> {
5335    if object_type == mz_sql_parser::ast::ObjectType::Func {
5336        bail_unsupported!("DROP FUNCTION");
5337    }
5338    let object_type = object_type.into();
5339
5340    let mut referenced_ids = Vec::new();
5341    for name in names {
5342        let id = match &name {
5343            UnresolvedObjectName::Cluster(name) => {
5344                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5345            }
5346            UnresolvedObjectName::ClusterReplica(name) => {
5347                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5348            }
5349            UnresolvedObjectName::Database(name) => {
5350                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5351            }
5352            UnresolvedObjectName::Schema(name) => {
5353                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5354            }
5355            UnresolvedObjectName::Role(name) => {
5356                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5357            }
5358            UnresolvedObjectName::Item(name) => {
5359                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5360                    .map(ObjectId::Item)
5361            }
5362            UnresolvedObjectName::NetworkPolicy(name) => {
5363                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5364            }
5365        };
5366        match id {
5367            Some(id) => referenced_ids.push(id),
5368            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5369                name: name.to_ast_string_simple(),
5370                object_type,
5371            }),
5372        }
5373    }
5374    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5375
5376    Ok(Plan::DropObjects(DropObjectsPlan {
5377        referenced_ids,
5378        drop_ids,
5379        object_type,
5380    }))
5381}
5382
5383fn plan_drop_schema(
5384    scx: &StatementContext,
5385    if_exists: bool,
5386    name: &UnresolvedSchemaName,
5387    cascade: bool,
5388) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5389    // Special case for mz_temp: with lazy temporary schema creation, the temp
5390    // schema may not exist yet, but we still need to return the correct error.
5391    // Check the schema name directly against MZ_TEMP_SCHEMA.
5392    let normalized = normalize::unresolved_schema_name(name.clone())?;
5393    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5394        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5395    }
5396
5397    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5398        Some((database_spec, schema_spec)) => {
5399            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5400                sql_bail!(
5401                    "cannot drop schema {name} because it is required by the database system",
5402                );
5403            }
5404            if let SchemaSpecifier::Temporary = schema_spec {
5405                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5406            }
5407            let schema = scx.get_schema(&database_spec, &schema_spec);
5408            if !cascade && schema.has_items() {
5409                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5410                sql_bail!(
5411                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5412                    full_schema_name
5413                );
5414            }
5415            Some((database_spec, schema_spec))
5416        }
5417        None => None,
5418    })
5419}
5420
5421fn plan_drop_role(
5422    scx: &StatementContext,
5423    if_exists: bool,
5424    name: &Ident,
5425) -> Result<Option<RoleId>, PlanError> {
5426    match scx.catalog.resolve_role(name.as_str()) {
5427        Ok(role) => {
5428            let id = role.id();
5429            if &id == scx.catalog.active_role_id() {
5430                sql_bail!("current role cannot be dropped");
5431            }
5432            for role in scx.catalog.get_roles() {
5433                for (member_id, grantor_id) in role.membership() {
5434                    if &id == grantor_id {
5435                        let member_role = scx.catalog.get_role(member_id);
5436                        sql_bail!(
5437                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5438                            name.as_str(),
5439                            role.name(),
5440                            member_role.name()
5441                        );
5442                    }
5443                }
5444            }
5445            Ok(Some(role.id()))
5446        }
5447        Err(_) if if_exists => Ok(None),
5448        Err(e) => Err(e.into()),
5449    }
5450}
5451
5452fn plan_drop_cluster(
5453    scx: &StatementContext,
5454    if_exists: bool,
5455    name: &Ident,
5456    cascade: bool,
5457) -> Result<Option<ClusterId>, PlanError> {
5458    Ok(match resolve_cluster(scx, name, if_exists)? {
5459        Some(cluster) => {
5460            if !cascade && !cluster.bound_objects().is_empty() {
5461                return Err(PlanError::DependentObjectsStillExist {
5462                    object_type: "cluster".to_string(),
5463                    object_name: cluster.name().to_string(),
5464                    dependents: Vec::new(),
5465                });
5466            }
5467            Some(cluster.id())
5468        }
5469        None => None,
5470    })
5471}
5472
5473fn plan_drop_network_policy(
5474    scx: &StatementContext,
5475    if_exists: bool,
5476    name: &Ident,
5477) -> Result<Option<NetworkPolicyId>, PlanError> {
5478    match scx.catalog.resolve_network_policy(name.as_str()) {
5479        Ok(policy) => {
5480            // TODO(network_policy): When we support role based network policies, check if any role
5481            // currently has the specified policy set.
5482            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5483                Err(PlanError::NetworkPolicyInUse)
5484            } else {
5485                Ok(Some(policy.id()))
5486            }
5487        }
5488        Err(_) if if_exists => Ok(None),
5489        Err(e) => Err(e.into()),
5490    }
5491}
5492
5493fn plan_drop_cluster_replica(
5494    scx: &StatementContext,
5495    if_exists: bool,
5496    name: &QualifiedReplica,
5497) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5498    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5499    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5500}
5501
5502/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5503fn plan_drop_item(
5504    scx: &StatementContext,
5505    object_type: ObjectType,
5506    if_exists: bool,
5507    name: UnresolvedItemName,
5508    cascade: bool,
5509) -> Result<Option<CatalogItemId>, PlanError> {
5510    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5511        Ok(r) => r,
5512        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5513        Err(PlanError::MismatchedObjectType {
5514            name,
5515            is_type: ObjectType::MaterializedView,
5516            expected_type: ObjectType::View,
5517        }) => {
5518            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5519        }
5520        e => e?,
5521    };
5522
5523    Ok(match resolved {
5524        Some(catalog_item) => {
5525            if catalog_item.id().is_system() {
5526                sql_bail!(
5527                    "cannot drop {} {} because it is required by the database system",
5528                    catalog_item.item_type(),
5529                    scx.catalog.minimal_qualification(catalog_item.name()),
5530                );
5531            }
5532
5533            if !cascade {
5534                for id in catalog_item.used_by() {
5535                    let dep = scx.catalog.get_item(id);
5536                    if dependency_prevents_drop(object_type, dep) {
5537                        return Err(PlanError::DependentObjectsStillExist {
5538                            object_type: catalog_item.item_type().to_string(),
5539                            object_name: scx
5540                                .catalog
5541                                .minimal_qualification(catalog_item.name())
5542                                .to_string(),
5543                            dependents: vec![(
5544                                dep.item_type().to_string(),
5545                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5546                            )],
5547                        });
5548                    }
5549                }
5550                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5551                //  relies on entry. Unfortunately, we don't have that information readily available.
5552            }
5553            Some(catalog_item.id())
5554        }
5555        None => None,
5556    })
5557}
5558
5559/// Does the dependency `dep` prevent a drop of a non-cascade query?
5560fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5561    match object_type {
5562        ObjectType::Type => true,
5563        ObjectType::Table
5564        | ObjectType::View
5565        | ObjectType::MaterializedView
5566        | ObjectType::Source
5567        | ObjectType::Sink
5568        | ObjectType::Index
5569        | ObjectType::Role
5570        | ObjectType::Cluster
5571        | ObjectType::ClusterReplica
5572        | ObjectType::Secret
5573        | ObjectType::Connection
5574        | ObjectType::Database
5575        | ObjectType::Schema
5576        | ObjectType::Func
5577        | ObjectType::NetworkPolicy => match dep.item_type() {
5578            CatalogItemType::Func
5579            | CatalogItemType::Table
5580            | CatalogItemType::Source
5581            | CatalogItemType::View
5582            | CatalogItemType::MaterializedView
5583            | CatalogItemType::Sink
5584            | CatalogItemType::Type
5585            | CatalogItemType::Secret
5586            | CatalogItemType::Connection => true,
5587            CatalogItemType::Index => false,
5588        },
5589    }
5590}
5591
5592pub fn describe_alter_index_options(
5593    _: &StatementContext,
5594    _: AlterIndexStatement<Aug>,
5595) -> Result<StatementDesc, PlanError> {
5596    Ok(StatementDesc::new(None))
5597}
5598
5599pub fn describe_drop_owned(
5600    _: &StatementContext,
5601    _: DropOwnedStatement<Aug>,
5602) -> Result<StatementDesc, PlanError> {
5603    Ok(StatementDesc::new(None))
5604}
5605
5606pub fn plan_drop_owned(
5607    scx: &StatementContext,
5608    drop: DropOwnedStatement<Aug>,
5609) -> Result<Plan, PlanError> {
5610    let cascade = drop.cascade();
5611    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5612    let mut drop_ids = Vec::new();
5613    let mut privilege_revokes = Vec::new();
5614    let mut default_privilege_revokes = Vec::new();
5615
5616    fn update_privilege_revokes(
5617        object_id: SystemObjectId,
5618        privileges: &PrivilegeMap,
5619        role_ids: &BTreeSet<RoleId>,
5620        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5621    ) {
5622        privilege_revokes.extend(iter::zip(
5623            iter::repeat(object_id),
5624            privileges
5625                .all_values()
5626                .filter(|privilege| role_ids.contains(&privilege.grantee))
5627                .cloned(),
5628        ));
5629    }
5630
5631    // Replicas
5632    for replica in scx.catalog.get_cluster_replicas() {
5633        if role_ids.contains(&replica.owner_id()) {
5634            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5635        }
5636    }
5637
5638    // Clusters
5639    for cluster in scx.catalog.get_clusters() {
5640        if role_ids.contains(&cluster.owner_id()) {
5641            // Note: CASCADE is not required for replicas.
5642            if !cascade {
5643                let non_owned_bound_objects: Vec<_> = cluster
5644                    .bound_objects()
5645                    .into_iter()
5646                    .map(|item_id| scx.catalog.get_item(item_id))
5647                    .filter(|item| !role_ids.contains(&item.owner_id()))
5648                    .collect();
5649                if !non_owned_bound_objects.is_empty() {
5650                    let names: Vec<_> = non_owned_bound_objects
5651                        .into_iter()
5652                        .map(|item| {
5653                            (
5654                                item.item_type().to_string(),
5655                                scx.catalog.resolve_full_name(item.name()).to_string(),
5656                            )
5657                        })
5658                        .collect();
5659                    return Err(PlanError::DependentObjectsStillExist {
5660                        object_type: "cluster".to_string(),
5661                        object_name: cluster.name().to_string(),
5662                        dependents: names,
5663                    });
5664                }
5665            }
5666            drop_ids.push(cluster.id().into());
5667        }
5668        update_privilege_revokes(
5669            SystemObjectId::Object(cluster.id().into()),
5670            cluster.privileges(),
5671            &role_ids,
5672            &mut privilege_revokes,
5673        );
5674    }
5675
5676    // Items
5677    for item in scx.catalog.get_items() {
5678        if role_ids.contains(&item.owner_id()) {
5679            if !cascade {
5680                // Checks if any items still depend on this one, returning an error if so.
5681                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5682                    let non_owned_dependencies: Vec<_> = used_by
5683                        .into_iter()
5684                        .map(|item_id| scx.catalog.get_item(item_id))
5685                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5686                        .filter(|item| !role_ids.contains(&item.owner_id()))
5687                        .collect();
5688                    if !non_owned_dependencies.is_empty() {
5689                        let names: Vec<_> = non_owned_dependencies
5690                            .into_iter()
5691                            .map(|item| {
5692                                let item_typ = item.item_type().to_string();
5693                                let item_name =
5694                                    scx.catalog.resolve_full_name(item.name()).to_string();
5695                                (item_typ, item_name)
5696                            })
5697                            .collect();
5698                        Err(PlanError::DependentObjectsStillExist {
5699                            object_type: item.item_type().to_string(),
5700                            object_name: scx
5701                                .catalog
5702                                .resolve_full_name(item.name())
5703                                .to_string()
5704                                .to_string(),
5705                            dependents: names,
5706                        })
5707                    } else {
5708                        Ok(())
5709                    }
5710                };
5711
5712                // When this item gets dropped it will also drop its progress source, so we need to
5713                // check the users of those.
5714                if let Some(id) = item.progress_id() {
5715                    let progress_item = scx.catalog.get_item(&id);
5716                    check_if_dependents_exist(progress_item.used_by())?;
5717                }
5718                check_if_dependents_exist(item.used_by())?;
5719            }
5720            drop_ids.push(item.id().into());
5721        }
5722        update_privilege_revokes(
5723            SystemObjectId::Object(item.id().into()),
5724            item.privileges(),
5725            &role_ids,
5726            &mut privilege_revokes,
5727        );
5728    }
5729
5730    // Schemas
5731    for schema in scx.catalog.get_schemas() {
5732        if !schema.id().is_temporary() {
5733            if role_ids.contains(&schema.owner_id()) {
5734                if !cascade {
5735                    let non_owned_dependencies: Vec<_> = schema
5736                        .item_ids()
5737                        .map(|item_id| scx.catalog.get_item(&item_id))
5738                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5739                        .filter(|item| !role_ids.contains(&item.owner_id()))
5740                        .collect();
5741                    if !non_owned_dependencies.is_empty() {
5742                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5743                        sql_bail!(
5744                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5745                            full_schema_name.to_string().quoted()
5746                        );
5747                    }
5748                }
5749                drop_ids.push((*schema.database(), *schema.id()).into())
5750            }
5751            update_privilege_revokes(
5752                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5753                schema.privileges(),
5754                &role_ids,
5755                &mut privilege_revokes,
5756            );
5757        }
5758    }
5759
5760    // Databases
5761    for database in scx.catalog.get_databases() {
5762        if role_ids.contains(&database.owner_id()) {
5763            if !cascade {
5764                let non_owned_schemas: Vec<_> = database
5765                    .schemas()
5766                    .into_iter()
5767                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5768                    .collect();
5769                if !non_owned_schemas.is_empty() {
5770                    sql_bail!(
5771                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5772                        database.name().quoted(),
5773                    );
5774                }
5775            }
5776            drop_ids.push(database.id().into());
5777        }
5778        update_privilege_revokes(
5779            SystemObjectId::Object(database.id().into()),
5780            database.privileges(),
5781            &role_ids,
5782            &mut privilege_revokes,
5783        );
5784    }
5785
5786    // Network policies
5787    for network_policy in scx.catalog.get_network_policies() {
5788        if role_ids.contains(&network_policy.owner_id()) {
5789            drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5790        }
5791        update_privilege_revokes(
5792            SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5793            network_policy.privileges(),
5794            &role_ids,
5795            &mut privilege_revokes,
5796        );
5797    }
5798
5799    // System
5800    update_privilege_revokes(
5801        SystemObjectId::System,
5802        scx.catalog.get_system_privileges(),
5803        &role_ids,
5804        &mut privilege_revokes,
5805    );
5806
5807    for (default_privilege_object, default_privilege_acl_items) in
5808        scx.catalog.get_default_privileges()
5809    {
5810        for default_privilege_acl_item in default_privilege_acl_items {
5811            if role_ids.contains(&default_privilege_object.role_id)
5812                || role_ids.contains(&default_privilege_acl_item.grantee)
5813            {
5814                default_privilege_revokes.push((
5815                    default_privilege_object.clone(),
5816                    default_privilege_acl_item.clone(),
5817                ));
5818            }
5819        }
5820    }
5821
5822    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5823
5824    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5825    if !system_ids.is_empty() {
5826        let mut owners = system_ids
5827            .into_iter()
5828            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5829            .collect::<BTreeSet<_>>()
5830            .into_iter()
5831            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5832        sql_bail!(
5833            "cannot drop objects owned by role {} because they are required by the database system",
5834            owners.join(", "),
5835        );
5836    }
5837
5838    Ok(Plan::DropOwned(DropOwnedPlan {
5839        role_ids: role_ids.into_iter().collect(),
5840        drop_ids,
5841        privilege_revokes,
5842        default_privilege_revokes,
5843    }))
5844}
5845
5846fn plan_retain_history_option(
5847    scx: &StatementContext,
5848    retain_history: Option<OptionalDuration>,
5849) -> Result<Option<CompactionWindow>, PlanError> {
5850    if let Some(OptionalDuration(lcw)) = retain_history {
5851        Ok(Some(plan_retain_history(scx, lcw)?))
5852    } else {
5853        Ok(None)
5854    }
5855}
5856
5857// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5858// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5859// already converts the zero duration into `None`. This function must not be called in the `RESET
5860// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5861// `None`.
5862fn plan_retain_history(
5863    scx: &StatementContext,
5864    lcw: Option<Duration>,
5865) -> Result<CompactionWindow, PlanError> {
5866    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5867    match lcw {
5868        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5869        // disable compaction), and should never occur here. Furthermore, some things actually do
5870        // break when this is set to real zero:
5871        // https://github.com/MaterializeInc/database-issues/issues/3798.
5872        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5873            option_name: "RETAIN HISTORY".to_string(),
5874            err: Box::new(PlanError::Unstructured(
5875                "internal error: unexpectedly zero".to_string(),
5876            )),
5877        }),
5878        Some(duration) => {
5879            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5880            // should only be possible during testing).
5881            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5882                && scx
5883                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5884                    .is_err()
5885            {
5886                return Err(PlanError::RetainHistoryLow {
5887                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5888                });
5889            }
5890            Ok(duration.try_into()?)
5891        }
5892        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5893        // to be a bad choice, so prevent it.
5894        None => {
5895            if scx
5896                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5897                .is_err()
5898            {
5899                Err(PlanError::RetainHistoryRequired)
5900            } else {
5901                Ok(CompactionWindow::DisableCompaction)
5902            }
5903        }
5904    }
5905}
5906
5907generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5908
5909fn plan_index_options(
5910    scx: &StatementContext,
5911    with_opts: Vec<IndexOption<Aug>>,
5912) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5913    if !with_opts.is_empty() {
5914        // Index options are not durable.
5915        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5916    }
5917
5918    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5919
5920    let mut out = Vec::with_capacity(1);
5921    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5922        out.push(crate::plan::IndexOption::RetainHistory(cw));
5923    }
5924    Ok(out)
5925}
5926
5927generate_extracted_config!(
5928    TableOption,
5929    (PartitionBy, Vec<Ident>),
5930    (RetainHistory, OptionalDuration),
5931    (RedactedTest, String)
5932);
5933
5934fn plan_table_options(
5935    scx: &StatementContext,
5936    desc: &RelationDesc,
5937    with_opts: Vec<TableOption<Aug>>,
5938) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5939    let TableOptionExtracted {
5940        partition_by,
5941        retain_history,
5942        redacted_test,
5943        ..
5944    }: TableOptionExtracted = with_opts.try_into()?;
5945
5946    if let Some(partition_by) = partition_by {
5947        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5948        check_partition_by(desc, partition_by)?;
5949    }
5950
5951    if redacted_test.is_some() {
5952        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5953    }
5954
5955    let mut out = Vec::with_capacity(1);
5956    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5957        out.push(crate::plan::TableOption::RetainHistory(cw));
5958    }
5959    Ok(out)
5960}
5961
5962pub fn plan_alter_index_options(
5963    scx: &mut StatementContext,
5964    AlterIndexStatement {
5965        index_name,
5966        if_exists,
5967        action,
5968    }: AlterIndexStatement<Aug>,
5969) -> Result<Plan, PlanError> {
5970    let object_type = ObjectType::Index;
5971    match action {
5972        AlterIndexAction::ResetOptions(options) => {
5973            let mut options = options.into_iter();
5974            if let Some(opt) = options.next() {
5975                match opt {
5976                    IndexOptionName::RetainHistory => {
5977                        if options.next().is_some() {
5978                            sql_bail!("RETAIN HISTORY must be only option");
5979                        }
5980                        return alter_retain_history(
5981                            scx,
5982                            object_type,
5983                            if_exists,
5984                            UnresolvedObjectName::Item(index_name),
5985                            None,
5986                        );
5987                    }
5988                }
5989            }
5990            sql_bail!("expected option");
5991        }
5992        AlterIndexAction::SetOptions(options) => {
5993            let mut options = options.into_iter();
5994            if let Some(opt) = options.next() {
5995                match opt.name {
5996                    IndexOptionName::RetainHistory => {
5997                        if options.next().is_some() {
5998                            sql_bail!("RETAIN HISTORY must be only option");
5999                        }
6000                        return alter_retain_history(
6001                            scx,
6002                            object_type,
6003                            if_exists,
6004                            UnresolvedObjectName::Item(index_name),
6005                            opt.value,
6006                        );
6007                    }
6008                }
6009            }
6010            sql_bail!("expected option");
6011        }
6012    }
6013}
6014
6015pub fn describe_alter_cluster_set_options(
6016    _: &StatementContext,
6017    _: AlterClusterStatement<Aug>,
6018) -> Result<StatementDesc, PlanError> {
6019    Ok(StatementDesc::new(None))
6020}
6021
6022pub fn plan_alter_cluster(
6023    scx: &mut StatementContext,
6024    AlterClusterStatement {
6025        name,
6026        action,
6027        if_exists,
6028    }: AlterClusterStatement<Aug>,
6029) -> Result<Plan, PlanError> {
6030    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6031        Some(entry) => entry,
6032        None => {
6033            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6034                name: name.to_ast_string_simple(),
6035                object_type: ObjectType::Cluster,
6036            });
6037
6038            return Ok(Plan::AlterNoop(AlterNoopPlan {
6039                object_type: ObjectType::Cluster,
6040            }));
6041        }
6042    };
6043
6044    let mut options: PlanClusterOption = Default::default();
6045    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6046
6047    match action {
6048        AlterClusterAction::SetOptions {
6049            options: set_options,
6050            with_options,
6051        } => {
6052            let ClusterOptionExtracted {
6053                availability_zones,
6054                introspection_debugging,
6055                introspection_interval,
6056                managed,
6057                replicas: replica_defs,
6058                replication_factor,
6059                seen: _,
6060                size,
6061                disk,
6062                schedule,
6063                workload_class,
6064            }: ClusterOptionExtracted = set_options.try_into()?;
6065
6066            if !scx.catalog.active_role_id().is_system() {
6067                if workload_class.is_some() {
6068                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6069                }
6070            }
6071
6072            match managed.unwrap_or_else(|| cluster.is_managed()) {
6073                true => {
6074                    let alter_strategy_extracted =
6075                        ClusterAlterOptionExtracted::try_from(with_options)?;
6076                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6077
6078                    match alter_strategy {
6079                        AlterClusterPlanStrategy::None => {}
6080                        _ => {
6081                            scx.require_feature_flag(
6082                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6083                            )?;
6084                        }
6085                    }
6086
6087                    if replica_defs.is_some() {
6088                        sql_bail!("REPLICAS not supported for managed clusters");
6089                    }
6090                    if schedule.is_some()
6091                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6092                    {
6093                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6094                    }
6095
6096                    if replication_factor.is_some() {
6097                        if schedule.is_some()
6098                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6099                        {
6100                            sql_bail!(
6101                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6102                            );
6103                        }
6104                        if let Some(current_schedule) = cluster.schedule() {
6105                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6106                                sql_bail!(
6107                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6108                                );
6109                            }
6110                        }
6111                    }
6112                }
6113                false => {
6114                    if !alter_strategy.is_none() {
6115                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6116                    }
6117                    if availability_zones.is_some() {
6118                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6119                    }
6120                    if replication_factor.is_some() {
6121                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6122                    }
6123                    if introspection_debugging.is_some() {
6124                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6125                    }
6126                    if introspection_interval.is_some() {
6127                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6128                    }
6129                    if size.is_some() {
6130                        sql_bail!("SIZE not supported for unmanaged clusters");
6131                    }
6132                    if disk.is_some() {
6133                        sql_bail!("DISK not supported for unmanaged clusters");
6134                    }
6135                    if schedule.is_some()
6136                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6137                    {
6138                        sql_bail!(
6139                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6140                        );
6141                    }
6142                    if let Some(current_schedule) = cluster.schedule() {
6143                        if !matches!(current_schedule, ClusterSchedule::Manual)
6144                            && schedule.is_none()
6145                        {
6146                            sql_bail!(
6147                                "when switching a cluster to unmanaged, if the managed \
6148                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6149                                explicitly set the SCHEDULE to MANUAL"
6150                            );
6151                        }
6152                    }
6153                }
6154            }
6155
6156            let mut replicas = vec![];
6157            for ReplicaDefinition { name, options } in
6158                replica_defs.into_iter().flat_map(Vec::into_iter)
6159            {
6160                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6161            }
6162
6163            if let Some(managed) = managed {
6164                options.managed = AlterOptionParameter::Set(managed);
6165            }
6166            if let Some(replication_factor) = replication_factor {
6167                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6168            }
6169            if let Some(size) = &size {
6170                options.size = AlterOptionParameter::Set(size.clone());
6171            }
6172            if let Some(availability_zones) = availability_zones {
6173                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6174            }
6175            if let Some(introspection_debugging) = introspection_debugging {
6176                options.introspection_debugging =
6177                    AlterOptionParameter::Set(introspection_debugging);
6178            }
6179            if let Some(introspection_interval) = introspection_interval {
6180                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6181            }
6182            if disk.is_some() {
6183                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6184                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6185                // we'll be able to remove the `DISK` option entirely.
6186                let size = match size.as_deref() {
6187                    Some(s) => s,
6188                    None => cluster
6189                        .managed_size()
6190                        .ok_or_else(|| sql_err!("cluster is not managed"))?,
6191                };
6192                if scx.catalog.is_cluster_size_cc(size) {
6193                    sql_bail!(
6194                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6195                    );
6196                }
6197
6198                scx.catalog
6199                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6200            }
6201            if !replicas.is_empty() {
6202                options.replicas = AlterOptionParameter::Set(replicas);
6203            }
6204            if let Some(schedule) = schedule {
6205                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6206            }
6207            if let Some(workload_class) = workload_class {
6208                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6209            }
6210        }
6211        AlterClusterAction::ResetOptions(reset_options) => {
6212            use AlterOptionParameter::Reset;
6213            use ClusterOptionName::*;
6214
6215            if !scx.catalog.active_role_id().is_system() {
6216                if reset_options.contains(&WorkloadClass) {
6217                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6218                }
6219            }
6220
6221            for option in reset_options {
6222                match option {
6223                    AvailabilityZones => options.availability_zones = Reset,
6224                    Disk => scx
6225                        .catalog
6226                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6227                    IntrospectionInterval => options.introspection_interval = Reset,
6228                    IntrospectionDebugging => options.introspection_debugging = Reset,
6229                    Managed => options.managed = Reset,
6230                    Replicas => options.replicas = Reset,
6231                    ReplicationFactor => options.replication_factor = Reset,
6232                    Size => options.size = Reset,
6233                    Schedule => options.schedule = Reset,
6234                    WorkloadClass => options.workload_class = Reset,
6235                }
6236            }
6237        }
6238    }
6239    Ok(Plan::AlterCluster(AlterClusterPlan {
6240        id: cluster.id(),
6241        name: cluster.name().to_string(),
6242        options,
6243        strategy: alter_strategy,
6244    }))
6245}
6246
6247pub fn describe_alter_set_cluster(
6248    _: &StatementContext,
6249    _: AlterSetClusterStatement<Aug>,
6250) -> Result<StatementDesc, PlanError> {
6251    Ok(StatementDesc::new(None))
6252}
6253
6254pub fn plan_alter_item_set_cluster(
6255    scx: &StatementContext,
6256    AlterSetClusterStatement {
6257        if_exists,
6258        set_cluster: in_cluster_name,
6259        name,
6260        object_type,
6261    }: AlterSetClusterStatement<Aug>,
6262) -> Result<Plan, PlanError> {
6263    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6264
6265    let object_type = object_type.into();
6266
6267    // Prevent access to `SET CLUSTER` for unsupported objects.
6268    match object_type {
6269        ObjectType::MaterializedView => {}
6270        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6271            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6272        }
6273        ObjectType::Table
6274        | ObjectType::View
6275        | ObjectType::Type
6276        | ObjectType::Role
6277        | ObjectType::Cluster
6278        | ObjectType::ClusterReplica
6279        | ObjectType::Secret
6280        | ObjectType::Connection
6281        | ObjectType::Database
6282        | ObjectType::Schema
6283        | ObjectType::Func
6284        | ObjectType::NetworkPolicy => {
6285            bail_never_supported!(
6286                format!("ALTER {object_type} SET CLUSTER"),
6287                "sql/alter-set-cluster/",
6288                format!("{object_type} has no associated cluster")
6289            )
6290        }
6291    }
6292
6293    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6294
6295    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6296        Some(entry) => {
6297            let current_cluster = entry.cluster_id();
6298            let Some(current_cluster) = current_cluster else {
6299                sql_bail!("No cluster associated with {name}");
6300            };
6301
6302            if current_cluster == in_cluster.id() {
6303                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6304            } else {
6305                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6306                    id: entry.id(),
6307                    set_cluster: in_cluster.id(),
6308                }))
6309            }
6310        }
6311        None => {
6312            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6313                name: name.to_ast_string_simple(),
6314                object_type,
6315            });
6316
6317            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6318        }
6319    }
6320}
6321
6322pub fn describe_alter_object_rename(
6323    _: &StatementContext,
6324    _: AlterObjectRenameStatement,
6325) -> Result<StatementDesc, PlanError> {
6326    Ok(StatementDesc::new(None))
6327}
6328
6329pub fn plan_alter_object_rename(
6330    scx: &mut StatementContext,
6331    AlterObjectRenameStatement {
6332        name,
6333        object_type,
6334        to_item_name,
6335        if_exists,
6336    }: AlterObjectRenameStatement,
6337) -> Result<Plan, PlanError> {
6338    let object_type = object_type.into();
6339    match (object_type, name) {
6340        (
6341            ObjectType::View
6342            | ObjectType::MaterializedView
6343            | ObjectType::Table
6344            | ObjectType::Source
6345            | ObjectType::Index
6346            | ObjectType::Sink
6347            | ObjectType::Secret
6348            | ObjectType::Connection,
6349            UnresolvedObjectName::Item(name),
6350        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6351        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6352            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6353        }
6354        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6355            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6356        }
6357        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6358            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6359        }
6360        (object_type, name) => {
6361            // The earlier dispatch + name resolution should make this
6362            // combination impossible.
6363            bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6364        }
6365    }
6366}
6367
6368pub fn plan_alter_schema_rename(
6369    scx: &mut StatementContext,
6370    name: UnresolvedSchemaName,
6371    to_schema_name: Ident,
6372    if_exists: bool,
6373) -> Result<Plan, PlanError> {
6374    // Special case for mz_temp: with lazy temporary schema creation, the temp
6375    // schema may not exist yet, but we still need to return the correct error.
6376    // Check the schema name directly against MZ_TEMP_SCHEMA.
6377    let normalized = normalize::unresolved_schema_name(name.clone())?;
6378    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6379        sql_bail!(
6380            "cannot rename schemas in the ambient database: {:?}",
6381            mz_repr::namespaces::MZ_TEMP_SCHEMA
6382        );
6383    }
6384
6385    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6386        let object_type = ObjectType::Schema;
6387        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6388            name: name.to_ast_string_simple(),
6389            object_type,
6390        });
6391        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6392    };
6393
6394    // Make sure the name is unique.
6395    if scx
6396        .resolve_schema_in_database(&db_spec, &to_schema_name)
6397        .is_ok()
6398    {
6399        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6400            to_schema_name.clone().into_string(),
6401        )));
6402    }
6403
6404    // Prevent users from renaming system related schemas.
6405    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6406    if schema.id().is_system() {
6407        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6408    }
6409
6410    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6411        cur_schema_spec: (db_spec, schema_spec),
6412        new_schema_name: to_schema_name.into_string(),
6413    }))
6414}
6415
6416pub fn plan_alter_schema_swap<F>(
6417    scx: &mut StatementContext,
6418    name_a: UnresolvedSchemaName,
6419    name_b: Ident,
6420    if_exists: bool,
6421    gen_temp_suffix: F,
6422) -> Result<Plan, PlanError>
6423where
6424    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6425{
6426    // Special case for mz_temp: with lazy temporary schema creation, the temp
6427    // schema may not exist yet, but we still need to return the correct error.
6428    // Check the schema name directly against MZ_TEMP_SCHEMA.
6429    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6430    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6431    {
6432        sql_bail!("cannot swap schemas that are in the ambient database");
6433    }
6434    // Also check name_b (the target schema name)
6435    let name_b_str = normalize::ident_ref(&name_b);
6436    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6437        sql_bail!("cannot swap schemas that are in the ambient database");
6438    }
6439
6440    let schema_a = match scx.resolve_schema(name_a.clone()) {
6441        Ok(schema) => schema,
6442        Err(_) if if_exists => {
6443            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6444                name: name_a.to_ast_string_simple(),
6445                object_type: ObjectType::Schema,
6446            });
6447            return Ok(Plan::AlterNoop(AlterNoopPlan {
6448                object_type: ObjectType::Schema,
6449            }));
6450        }
6451        Err(e) => return Err(e),
6452    };
6453
6454    let db_spec = schema_a.database().clone();
6455    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6456        sql_bail!("cannot swap schemas that are in the ambient database");
6457    };
6458    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6459
6460    // We cannot swap system schemas.
6461    if schema_a.id().is_system() || schema_b.id().is_system() {
6462        bail_never_supported!("swapping a system schema".to_string())
6463    }
6464
6465    // Generate a temporary name we can swap schema_a to.
6466    //
6467    // 'check' returns if the temp schema name would be valid.
6468    const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6469    let check = |temp_suffix: &str| {
6470        let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6471        temp_name.append_lossy(temp_suffix);
6472        scx.resolve_schema_in_database(&db_spec, &temp_name)
6473            .is_err()
6474    };
6475    let temp_suffix = gen_temp_suffix(&check)?;
6476    let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6477
6478    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6479        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6480        schema_a_name: schema_a.name().schema.to_string(),
6481        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6482        schema_b_name: schema_b.name().schema.to_string(),
6483        name_temp,
6484    }))
6485}
6486
6487pub fn plan_alter_item_rename(
6488    scx: &mut StatementContext,
6489    object_type: ObjectType,
6490    name: UnresolvedItemName,
6491    to_item_name: Ident,
6492    if_exists: bool,
6493) -> Result<Plan, PlanError> {
6494    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6495        Ok(r) => r,
6496        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6497        Err(PlanError::MismatchedObjectType {
6498            name,
6499            is_type: ObjectType::MaterializedView,
6500            expected_type: ObjectType::View,
6501        }) => {
6502            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6503        }
6504        e => e?,
6505    };
6506
6507    match resolved {
6508        Some(entry) => {
6509            let full_name = scx.catalog.resolve_full_name(entry.name());
6510            let item_type = entry.item_type();
6511
6512            let proposed_name = QualifiedItemName {
6513                qualifiers: entry.name().qualifiers.clone(),
6514                item: to_item_name.clone().into_string(),
6515            };
6516
6517            // For PostgreSQL compatibility, items and types cannot have
6518            // overlapping names in a variety of situations. See the comment on
6519            // `CatalogItemType::conflicts_with_type` for details.
6520            let conflicting_type_exists;
6521            let conflicting_item_exists;
6522            if item_type == CatalogItemType::Type {
6523                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6524                conflicting_item_exists = scx
6525                    .catalog
6526                    .get_item_by_name(&proposed_name)
6527                    .map(|item| item.item_type().conflicts_with_type())
6528                    .unwrap_or(false);
6529            } else {
6530                conflicting_type_exists = item_type.conflicts_with_type()
6531                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6532                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6533            };
6534            if conflicting_type_exists || conflicting_item_exists {
6535                sql_bail!("catalog item '{}' already exists", to_item_name);
6536            }
6537
6538            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6539                id: entry.id(),
6540                current_full_name: full_name,
6541                to_name: normalize::ident(to_item_name),
6542                object_type,
6543            }))
6544        }
6545        None => {
6546            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6547                name: name.to_ast_string_simple(),
6548                object_type,
6549            });
6550
6551            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6552        }
6553    }
6554}
6555
6556pub fn plan_alter_cluster_rename(
6557    scx: &mut StatementContext,
6558    object_type: ObjectType,
6559    name: Ident,
6560    to_name: Ident,
6561    if_exists: bool,
6562) -> Result<Plan, PlanError> {
6563    match resolve_cluster(scx, &name, if_exists)? {
6564        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6565            id: entry.id(),
6566            name: entry.name().to_string(),
6567            to_name: ident(to_name),
6568        })),
6569        None => {
6570            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6571                name: name.to_ast_string_simple(),
6572                object_type,
6573            });
6574
6575            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6576        }
6577    }
6578}
6579
6580pub fn plan_alter_cluster_swap<F>(
6581    scx: &mut StatementContext,
6582    name_a: Ident,
6583    name_b: Ident,
6584    if_exists: bool,
6585    gen_temp_suffix: F,
6586) -> Result<Plan, PlanError>
6587where
6588    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6589{
6590    let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6591        Ok(cluster) => cluster,
6592        Err(_) if if_exists => {
6593            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6594                name: name_a.to_ast_string_simple(),
6595                object_type: ObjectType::Cluster,
6596            });
6597            return Ok(Plan::AlterNoop(AlterNoopPlan {
6598                object_type: ObjectType::Cluster,
6599            }));
6600        }
6601        Err(e) => return Err(e),
6602    };
6603    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6604
6605    const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6606    let check = |temp_suffix: &str| {
6607        let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6608        temp_name.append_lossy(temp_suffix);
6609        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6610            // Temp name does not exist, so we can use it.
6611            Err(CatalogError::UnknownCluster(_)) => true,
6612            // Temp name already exists!
6613            Ok(_) | Err(_) => false,
6614        }
6615    };
6616    let temp_suffix = gen_temp_suffix(&check)?;
6617    let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6618
6619    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6620        id_a: cluster_a.id(),
6621        id_b: cluster_b.id(),
6622        name_a: name_a.into_string(),
6623        name_b: name_b.into_string(),
6624        name_temp,
6625    }))
6626}
6627
6628pub fn plan_alter_cluster_replica_rename(
6629    scx: &mut StatementContext,
6630    object_type: ObjectType,
6631    name: QualifiedReplica,
6632    to_item_name: Ident,
6633    if_exists: bool,
6634) -> Result<Plan, PlanError> {
6635    match resolve_cluster_replica(scx, &name, if_exists)? {
6636        Some((cluster, replica)) => {
6637            ensure_cluster_is_not_managed(scx, cluster.id())?;
6638            Ok(Plan::AlterClusterReplicaRename(
6639                AlterClusterReplicaRenamePlan {
6640                    cluster_id: cluster.id(),
6641                    replica_id: replica,
6642                    name: QualifiedReplica {
6643                        cluster: Ident::new(cluster.name())?,
6644                        replica: name.replica,
6645                    },
6646                    to_name: normalize::ident(to_item_name),
6647                },
6648            ))
6649        }
6650        None => {
6651            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6652                name: name.to_ast_string_simple(),
6653                object_type,
6654            });
6655
6656            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6657        }
6658    }
6659}
6660
6661pub fn describe_alter_object_swap(
6662    _: &StatementContext,
6663    _: AlterObjectSwapStatement,
6664) -> Result<StatementDesc, PlanError> {
6665    Ok(StatementDesc::new(None))
6666}
6667
6668pub fn plan_alter_object_swap(
6669    scx: &mut StatementContext,
6670    stmt: AlterObjectSwapStatement,
6671) -> Result<Plan, PlanError> {
6672    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6673
6674    let AlterObjectSwapStatement {
6675        object_type,
6676        if_exists,
6677        name_a,
6678        name_b,
6679    } = stmt;
6680    let object_type = object_type.into();
6681
6682    // We'll try 10 times to generate a temporary suffix.
6683    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6684        let mut attempts = 0;
6685        let name_temp = loop {
6686            attempts += 1;
6687            if attempts > 10 {
6688                tracing::warn!("Unable to generate temp id for swapping");
6689                sql_bail!("unable to swap!");
6690            }
6691
6692            // Call the provided closure to make sure this name is unique!
6693            let short_id = mz_ore::id_gen::temp_id();
6694            if check_fn(&short_id) {
6695                break short_id;
6696            }
6697        };
6698
6699        Ok(name_temp)
6700    };
6701
6702    match (object_type, name_a, name_b) {
6703        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6704            plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6705        }
6706        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6707            plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6708        }
6709        (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6710            bail_internal!("name type does not match object type for ALTER SWAP")
6711        }
6712        (
6713            ObjectType::Table
6714            | ObjectType::View
6715            | ObjectType::MaterializedView
6716            | ObjectType::Source
6717            | ObjectType::Sink
6718            | ObjectType::Index
6719            | ObjectType::Type
6720            | ObjectType::Role
6721            | ObjectType::ClusterReplica
6722            | ObjectType::Secret
6723            | ObjectType::Connection
6724            | ObjectType::Database
6725            | ObjectType::Func
6726            | ObjectType::NetworkPolicy,
6727            _,
6728            _,
6729        ) => Err(PlanError::Unsupported {
6730            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6731            discussion_no: None,
6732        }),
6733    }
6734}
6735
6736pub fn describe_alter_retain_history(
6737    _: &StatementContext,
6738    _: AlterRetainHistoryStatement<Aug>,
6739) -> Result<StatementDesc, PlanError> {
6740    Ok(StatementDesc::new(None))
6741}
6742
6743pub fn plan_alter_retain_history(
6744    scx: &StatementContext,
6745    AlterRetainHistoryStatement {
6746        object_type,
6747        if_exists,
6748        name,
6749        history,
6750    }: AlterRetainHistoryStatement<Aug>,
6751) -> Result<Plan, PlanError> {
6752    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6753}
6754
6755fn alter_retain_history(
6756    scx: &StatementContext,
6757    object_type: ObjectType,
6758    if_exists: bool,
6759    name: UnresolvedObjectName,
6760    history: Option<WithOptionValue<Aug>>,
6761) -> Result<Plan, PlanError> {
6762    let name = match (object_type, name) {
6763        (
6764            // View gets a special error below.
6765            ObjectType::View
6766            | ObjectType::MaterializedView
6767            | ObjectType::Table
6768            | ObjectType::Source
6769            | ObjectType::Index,
6770            UnresolvedObjectName::Item(name),
6771        ) => name,
6772        (object_type, _) => {
6773            bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6774        }
6775    };
6776    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6777        Some(entry) => {
6778            let full_name = scx.catalog.resolve_full_name(entry.name());
6779            let item_type = entry.item_type();
6780
6781            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6782            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6783                return Err(PlanError::AlterViewOnMaterializedView(
6784                    full_name.to_string(),
6785                ));
6786            } else if object_type == ObjectType::View {
6787                sql_bail!("{object_type} does not support RETAIN HISTORY")
6788            } else if object_type != item_type {
6789                sql_bail!(
6790                    "\"{}\" is a {} not a {}",
6791                    full_name,
6792                    entry.item_type(),
6793                    format!("{object_type}").to_lowercase()
6794                )
6795            }
6796
6797            // Save the original value so we can write it back down in the create_sql catalog item.
6798            let (value, lcw) = match &history {
6799                Some(WithOptionValue::RetainHistoryFor(value)) => {
6800                    let window = OptionalDuration::try_from_value(value.clone())?;
6801                    (Some(value.clone()), window.0)
6802                }
6803                // None is RESET, so use the default CW.
6804                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6805                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6806            };
6807            let window = plan_retain_history(scx, lcw)?;
6808
6809            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6810                id: entry.id(),
6811                value,
6812                window,
6813                object_type,
6814            }))
6815        }
6816        None => {
6817            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6818                name: name.to_ast_string_simple(),
6819                object_type,
6820            });
6821
6822            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6823        }
6824    }
6825}
6826
6827fn alter_source_timestamp_interval(
6828    scx: &StatementContext,
6829    if_exists: bool,
6830    source_name: UnresolvedItemName,
6831    value: Option<WithOptionValue<Aug>>,
6832) -> Result<Plan, PlanError> {
6833    let object_type = ObjectType::Source;
6834    match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6835        Some(entry) => {
6836            let full_name = scx.catalog.resolve_full_name(entry.name());
6837            if entry.item_type() != CatalogItemType::Source {
6838                sql_bail!(
6839                    "\"{}\" is a {} not a {}",
6840                    full_name,
6841                    entry.item_type(),
6842                    format!("{object_type}").to_lowercase()
6843                )
6844            }
6845
6846            match value {
6847                Some(val) => {
6848                    let val = match val {
6849                        WithOptionValue::Value(v) => v,
6850                        _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
6851                    };
6852                    let duration = Duration::try_from_value(val.clone())?;
6853
6854                    let min = scx.catalog.system_vars().min_timestamp_interval();
6855                    let max = scx.catalog.system_vars().max_timestamp_interval();
6856                    if duration < min || duration > max {
6857                        return Err(PlanError::InvalidTimestampInterval {
6858                            min,
6859                            max,
6860                            requested: duration,
6861                        });
6862                    }
6863
6864                    Ok(Plan::AlterSourceTimestampInterval(
6865                        AlterSourceTimestampIntervalPlan {
6866                            id: entry.id(),
6867                            value: Some(val),
6868                            interval: duration,
6869                        },
6870                    ))
6871                }
6872                None => {
6873                    let interval = scx.catalog.system_vars().default_timestamp_interval();
6874                    Ok(Plan::AlterSourceTimestampInterval(
6875                        AlterSourceTimestampIntervalPlan {
6876                            id: entry.id(),
6877                            value: None,
6878                            interval,
6879                        },
6880                    ))
6881                }
6882            }
6883        }
6884        None => {
6885            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6886                name: source_name.to_ast_string_simple(),
6887                object_type,
6888            });
6889
6890            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6891        }
6892    }
6893}
6894
6895pub fn describe_alter_secret_options(
6896    _: &StatementContext,
6897    _: AlterSecretStatement<Aug>,
6898) -> Result<StatementDesc, PlanError> {
6899    Ok(StatementDesc::new(None))
6900}
6901
6902pub fn plan_alter_secret(
6903    scx: &mut StatementContext,
6904    stmt: AlterSecretStatement<Aug>,
6905) -> Result<Plan, PlanError> {
6906    let AlterSecretStatement {
6907        name,
6908        if_exists,
6909        value,
6910    } = stmt;
6911    let object_type = ObjectType::Secret;
6912    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6913        Some(entry) => entry.id(),
6914        None => {
6915            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6916                name: name.to_string(),
6917                object_type,
6918            });
6919
6920            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6921        }
6922    };
6923
6924    let secret_as = query::plan_secret_as(scx, value)?;
6925
6926    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6927}
6928
6929pub fn describe_alter_connection(
6930    _: &StatementContext,
6931    _: AlterConnectionStatement<Aug>,
6932) -> Result<StatementDesc, PlanError> {
6933    Ok(StatementDesc::new(None))
6934}
6935
6936generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6937
6938pub fn plan_alter_connection(
6939    scx: &StatementContext,
6940    stmt: AlterConnectionStatement<Aug>,
6941) -> Result<Plan, PlanError> {
6942    let AlterConnectionStatement {
6943        name,
6944        if_exists,
6945        actions,
6946        with_options,
6947    } = stmt;
6948    let conn_name = normalize::unresolved_item_name(name)?;
6949    let entry = match scx.catalog.resolve_item(&conn_name) {
6950        Ok(entry) => entry,
6951        Err(_) if if_exists => {
6952            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6953                name: conn_name.to_string(),
6954                object_type: ObjectType::Connection,
6955            });
6956
6957            return Ok(Plan::AlterNoop(AlterNoopPlan {
6958                object_type: ObjectType::Connection,
6959            }));
6960        }
6961        Err(e) => return Err(e.into()),
6962    };
6963
6964    let connection = entry.connection()?;
6965
6966    if actions
6967        .iter()
6968        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6969    {
6970        if actions.len() > 1 {
6971            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6972        }
6973
6974        if !with_options.is_empty() {
6975            sql_bail!(
6976                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6977                with_options
6978                    .iter()
6979                    .map(|o| o.to_ast_string_simple())
6980                    .join(", ")
6981            );
6982        }
6983
6984        if !matches!(connection, Connection::Ssh(_)) {
6985            sql_bail!(
6986                "{} is not an SSH connection",
6987                scx.catalog.resolve_full_name(entry.name())
6988            )
6989        }
6990
6991        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6992            id: entry.id(),
6993            action: crate::plan::AlterConnectionAction::RotateKeys,
6994        }));
6995    }
6996
6997    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6998    if options.validate.is_some() {
6999        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7000    }
7001
7002    let validate = match options.validate {
7003        Some(val) => val,
7004        None => {
7005            scx.catalog
7006                .system_vars()
7007                .enable_default_connection_validation()
7008                && connection.validate_by_default()
7009        }
7010    };
7011
7012    let connection_type = match connection {
7013        Connection::Aws(_) => CreateConnectionType::Aws,
7014        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7015        Connection::Kafka(_) => CreateConnectionType::Kafka,
7016        Connection::Csr(_) => CreateConnectionType::Csr,
7017        Connection::Postgres(_) => CreateConnectionType::Postgres,
7018        Connection::Ssh(_) => CreateConnectionType::Ssh,
7019        Connection::MySql(_) => CreateConnectionType::MySql,
7020        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7021        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7022    };
7023
7024    // Collect all options irrespective of action taken on them.
7025    let specified_options: BTreeSet<_> = actions
7026        .iter()
7027        .map(|action: &AlterConnectionAction<Aug>| match action {
7028            AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7029            AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7030            AlterConnectionAction::RotateKeys => {
7031                Err(internal_err!("RotateKeys is handled separately above"))
7032            }
7033        })
7034        .collect::<Result<_, PlanError>>()?;
7035
7036    for invalid in INALTERABLE_OPTIONS {
7037        if specified_options.contains(invalid) {
7038            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7039        }
7040    }
7041
7042    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7043
7044    // Partition operations into set and drop.
7045    let mut set_options_vec: Vec<_> = Vec::new();
7046    let mut drop_options: BTreeSet<_> = BTreeSet::new();
7047    for action in actions {
7048        match action {
7049            AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7050            AlterConnectionAction::DropOption(name) => {
7051                drop_options.insert(name);
7052            }
7053            AlterConnectionAction::RotateKeys => {
7054                bail_internal!("RotateKeys is handled separately above")
7055            }
7056        }
7057    }
7058
7059    let set_options: BTreeMap<_, _> = set_options_vec
7060        .clone()
7061        .into_iter()
7062        .map(|option| (option.name, option.value))
7063        .collect();
7064
7065    // Type check values + avoid duplicates; we don't want to e.g. let users
7066    // drop and set the same option in the same statement, so treating drops as
7067    // sets here is fine.
7068    let connection_options_extracted =
7069        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7070
7071    let duplicates: Vec<_> = connection_options_extracted
7072        .seen
7073        .intersection(&drop_options)
7074        .collect();
7075
7076    if !duplicates.is_empty() {
7077        sql_bail!(
7078            "cannot both SET and DROP/RESET options {}",
7079            duplicates
7080                .iter()
7081                .map(|option| option.to_string())
7082                .join(", ")
7083        )
7084    }
7085
7086    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7087        let set_options_count = mutually_exclusive_options
7088            .iter()
7089            .filter(|o| set_options.contains_key(o))
7090            .count();
7091        let drop_options_count = mutually_exclusive_options
7092            .iter()
7093            .filter(|o| drop_options.contains(o))
7094            .count();
7095
7096        // Disallow setting _and_ resetting mutually exclusive options
7097        if set_options_count > 0 && drop_options_count > 0 {
7098            sql_bail!(
7099                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7100                connection_type,
7101                mutually_exclusive_options
7102                    .iter()
7103                    .map(|option| option.to_string())
7104                    .join(", ")
7105            )
7106        }
7107
7108        // If any option is either set or dropped, ensure all mutually exclusive
7109        // options are dropped. We do this "behind the scenes", even though we
7110        // disallow users from performing the same action because this is the
7111        // mechanism by which we overwrite values elsewhere in the code.
7112        if set_options_count > 0 || drop_options_count > 0 {
7113            drop_options.extend(mutually_exclusive_options.iter().cloned());
7114        }
7115
7116        // n.b. if mutually exclusive options are set, those will error when we
7117        // try to replan the connection.
7118    }
7119
7120    Ok(Plan::AlterConnection(AlterConnectionPlan {
7121        id: entry.id(),
7122        action: crate::plan::AlterConnectionAction::AlterOptions {
7123            set_options,
7124            drop_options,
7125            validate,
7126        },
7127    }))
7128}
7129
7130pub fn describe_alter_sink(
7131    _: &StatementContext,
7132    _: AlterSinkStatement<Aug>,
7133) -> Result<StatementDesc, PlanError> {
7134    Ok(StatementDesc::new(None))
7135}
7136
7137pub fn plan_alter_sink(
7138    scx: &mut StatementContext,
7139    stmt: AlterSinkStatement<Aug>,
7140) -> Result<Plan, PlanError> {
7141    let AlterSinkStatement {
7142        sink_name,
7143        if_exists,
7144        action,
7145    } = stmt;
7146
7147    let object_type = ObjectType::Sink;
7148    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7149
7150    let Some(item) = item else {
7151        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7152            name: sink_name.to_string(),
7153            object_type,
7154        });
7155
7156        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7157    };
7158    // Always ALTER objects from their latest version.
7159    let item = item.at_version(RelationVersionSelector::Latest);
7160
7161    match action {
7162        AlterSinkAction::ChangeRelation(new_from) => {
7163            // First we reconstruct the original CREATE SINK statement
7164            let create_sql = item.create_sql();
7165            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7166            let [stmt]: [StatementParseResult; 1] = stmts
7167                .try_into()
7168                .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7169            let Statement::CreateSink(stmt) = stmt.ast else {
7170                bail_internal!("create SQL of sink is not a CREATE SINK statement");
7171            };
7172
7173            // Then resolve and swap the resolved from relation to the new one
7174            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7175            stmt.from = new_from;
7176
7177            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7178            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7179                bail_internal!("plan_sink did not produce a CreateSink plan");
7180            };
7181
7182            plan.sink.version += 1;
7183
7184            Ok(Plan::AlterSink(AlterSinkPlan {
7185                item_id: item.id(),
7186                global_id: item.global_id(),
7187                sink: plan.sink,
7188                with_snapshot: plan.with_snapshot,
7189                in_cluster: plan.in_cluster,
7190            }))
7191        }
7192        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7193        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7194    }
7195}
7196
7197pub fn describe_alter_source(
7198    _: &StatementContext,
7199    _: AlterSourceStatement<Aug>,
7200) -> Result<StatementDesc, PlanError> {
7201    // TODO: put the options here, right?
7202    Ok(StatementDesc::new(None))
7203}
7204
7205generate_extracted_config!(
7206    AlterSourceAddSubsourceOption,
7207    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7208    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7209    (Details, String)
7210);
7211
7212pub fn plan_alter_source(
7213    scx: &mut StatementContext,
7214    stmt: AlterSourceStatement<Aug>,
7215) -> Result<Plan, PlanError> {
7216    let AlterSourceStatement {
7217        source_name,
7218        if_exists,
7219        action,
7220    } = stmt;
7221    let object_type = ObjectType::Source;
7222
7223    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7224        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7225            name: source_name.to_string(),
7226            object_type,
7227        });
7228
7229        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7230    }
7231
7232    match action {
7233        AlterSourceAction::SetOptions(options) => {
7234            let mut options = options.into_iter();
7235            let option = options
7236                .next()
7237                .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7238            if option.name == CreateSourceOptionName::RetainHistory {
7239                if options.next().is_some() {
7240                    sql_bail!("RETAIN HISTORY must be only option");
7241                }
7242                return alter_retain_history(
7243                    scx,
7244                    object_type,
7245                    if_exists,
7246                    UnresolvedObjectName::Item(source_name),
7247                    option.value,
7248                );
7249            }
7250            if option.name == CreateSourceOptionName::TimestampInterval {
7251                if options.next().is_some() {
7252                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7253                }
7254                return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7255            }
7256            // n.b we use this statement in purification in a way that cannot be
7257            // planned directly.
7258            sql_bail!(
7259                "Cannot modify the {} of a SOURCE.",
7260                option.name.to_ast_string_simple()
7261            );
7262        }
7263        AlterSourceAction::ResetOptions(reset) => {
7264            let mut options = reset.into_iter();
7265            let option = options
7266                .next()
7267                .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7268            if option == CreateSourceOptionName::RetainHistory {
7269                if options.next().is_some() {
7270                    sql_bail!("RETAIN HISTORY must be only option");
7271                }
7272                return alter_retain_history(
7273                    scx,
7274                    object_type,
7275                    if_exists,
7276                    UnresolvedObjectName::Item(source_name),
7277                    None,
7278                );
7279            }
7280            if option == CreateSourceOptionName::TimestampInterval {
7281                if options.next().is_some() {
7282                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7283                }
7284                return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7285            }
7286            sql_bail!(
7287                "Cannot modify the {} of a SOURCE.",
7288                option.to_ast_string_simple()
7289            );
7290        }
7291        AlterSourceAction::DropSubsources { .. } => {
7292            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7293        }
7294        AlterSourceAction::AddSubsources { .. } => {
7295            sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7296        }
7297        AlterSourceAction::RefreshReferences => {
7298            sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7299        }
7300    };
7301}
7302
7303pub fn describe_alter_system_set(
7304    _: &StatementContext,
7305    _: AlterSystemSetStatement,
7306) -> Result<StatementDesc, PlanError> {
7307    Ok(StatementDesc::new(None))
7308}
7309
7310pub fn plan_alter_system_set(
7311    _: &StatementContext,
7312    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7313) -> Result<Plan, PlanError> {
7314    let name = name.to_string();
7315    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7316        name,
7317        value: scl::plan_set_variable_to(to)?,
7318    }))
7319}
7320
7321pub fn describe_alter_system_reset(
7322    _: &StatementContext,
7323    _: AlterSystemResetStatement,
7324) -> Result<StatementDesc, PlanError> {
7325    Ok(StatementDesc::new(None))
7326}
7327
7328pub fn plan_alter_system_reset(
7329    _: &StatementContext,
7330    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7331) -> Result<Plan, PlanError> {
7332    let name = name.to_string();
7333    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7334}
7335
7336pub fn describe_alter_system_reset_all(
7337    _: &StatementContext,
7338    _: AlterSystemResetAllStatement,
7339) -> Result<StatementDesc, PlanError> {
7340    Ok(StatementDesc::new(None))
7341}
7342
7343pub fn plan_alter_system_reset_all(
7344    _: &StatementContext,
7345    _: AlterSystemResetAllStatement,
7346) -> Result<Plan, PlanError> {
7347    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7348}
7349
7350pub fn describe_alter_role(
7351    _: &StatementContext,
7352    _: AlterRoleStatement<Aug>,
7353) -> Result<StatementDesc, PlanError> {
7354    Ok(StatementDesc::new(None))
7355}
7356
7357pub fn plan_alter_role(
7358    scx: &StatementContext,
7359    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7360) -> Result<Plan, PlanError> {
7361    let option = match option {
7362        AlterRoleOption::Attributes(attrs) => {
7363            let attrs = plan_role_attributes(attrs, scx)?;
7364            PlannedAlterRoleOption::Attributes(attrs)
7365        }
7366        AlterRoleOption::Variable(variable) => {
7367            let var = plan_role_variable(variable)?;
7368            PlannedAlterRoleOption::Variable(var)
7369        }
7370    };
7371
7372    Ok(Plan::AlterRole(AlterRolePlan {
7373        id: name.id,
7374        name: name.name,
7375        option,
7376    }))
7377}
7378
7379pub fn describe_alter_table_add_column(
7380    _: &StatementContext,
7381    _: AlterTableAddColumnStatement<Aug>,
7382) -> Result<StatementDesc, PlanError> {
7383    Ok(StatementDesc::new(None))
7384}
7385
7386pub fn plan_alter_table_add_column(
7387    scx: &StatementContext,
7388    stmt: AlterTableAddColumnStatement<Aug>,
7389) -> Result<Plan, PlanError> {
7390    let AlterTableAddColumnStatement {
7391        if_exists,
7392        name,
7393        if_col_not_exist,
7394        column_name,
7395        data_type,
7396    } = stmt;
7397    let object_type = ObjectType::Table;
7398
7399    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7400
7401    let (relation_id, item_name, desc) =
7402        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7403            Some(item) => {
7404                // Always add columns to the latest version of the item.
7405                let item_name = scx.catalog.resolve_full_name(item.name());
7406                let item = item.at_version(RelationVersionSelector::Latest);
7407                let desc = item
7408                    .relation_desc()
7409                    .ok_or_else(|| sql_err!("item does not have a relation description"))?
7410                    .into_owned();
7411                (item.id(), item_name, desc)
7412            }
7413            None => {
7414                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7415                    name: name.to_ast_string_simple(),
7416                    object_type,
7417                });
7418                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7419            }
7420        };
7421
7422    let column_name = ColumnName::from(column_name.as_str());
7423    if desc.get_by_name(&column_name).is_some() {
7424        if if_col_not_exist {
7425            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7426                column_name: column_name.to_string(),
7427                object_name: item_name.item,
7428            });
7429            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7430        } else {
7431            return Err(PlanError::ColumnAlreadyExists {
7432                column_name,
7433                object_name: item_name.item,
7434            });
7435        }
7436    }
7437
7438    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7439    // TODO(alter_table): Support non-nullable columns with default values.
7440    let column_type = scalar_type.nullable(true);
7441    // "unresolve" our data type so we can later update the persisted create_sql.
7442    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7443
7444    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7445        relation_id,
7446        column_name,
7447        column_type,
7448        raw_sql_type,
7449    }))
7450}
7451
7452pub fn describe_alter_materialized_view_apply_replacement(
7453    _: &StatementContext,
7454    _: AlterMaterializedViewApplyReplacementStatement,
7455) -> Result<StatementDesc, PlanError> {
7456    Ok(StatementDesc::new(None))
7457}
7458
7459pub fn plan_alter_materialized_view_apply_replacement(
7460    scx: &StatementContext,
7461    stmt: AlterMaterializedViewApplyReplacementStatement,
7462) -> Result<Plan, PlanError> {
7463    let AlterMaterializedViewApplyReplacementStatement {
7464        if_exists,
7465        name,
7466        replacement_name,
7467    } = stmt;
7468
7469    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7470
7471    let object_type = ObjectType::MaterializedView;
7472    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7473        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7474            name: name.to_ast_string_simple(),
7475            object_type,
7476        });
7477        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7478    };
7479
7480    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7481        .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7482
7483    if replacement.replacement_target() != Some(mv.id()) {
7484        return Err(PlanError::InvalidReplacement {
7485            item_type: mv.item_type(),
7486            item_name: scx.catalog.minimal_qualification(mv.name()),
7487            replacement_type: replacement.item_type(),
7488            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7489        });
7490    }
7491
7492    Ok(Plan::AlterMaterializedViewApplyReplacement(
7493        AlterMaterializedViewApplyReplacementPlan {
7494            id: mv.id(),
7495            replacement_id: replacement.id(),
7496        },
7497    ))
7498}
7499
7500pub fn describe_comment(
7501    _: &StatementContext,
7502    _: CommentStatement<Aug>,
7503) -> Result<StatementDesc, PlanError> {
7504    Ok(StatementDesc::new(None))
7505}
7506
7507pub fn plan_comment(
7508    scx: &mut StatementContext,
7509    stmt: CommentStatement<Aug>,
7510) -> Result<Plan, PlanError> {
7511    const MAX_COMMENT_LENGTH: usize = 1024;
7512
7513    let CommentStatement { object, comment } = stmt;
7514
7515    // TODO(parkmycar): Make max comment length configurable.
7516    if let Some(c) = &comment {
7517        if c.len() > 1024 {
7518            return Err(PlanError::CommentTooLong {
7519                length: c.len(),
7520                max_size: MAX_COMMENT_LENGTH,
7521            });
7522        }
7523    }
7524
7525    let (object_id, column_pos) = match &object {
7526        com_ty @ CommentObjectType::Table { name }
7527        | com_ty @ CommentObjectType::View { name }
7528        | com_ty @ CommentObjectType::MaterializedView { name }
7529        | com_ty @ CommentObjectType::Index { name }
7530        | com_ty @ CommentObjectType::Func { name }
7531        | com_ty @ CommentObjectType::Connection { name }
7532        | com_ty @ CommentObjectType::Source { name }
7533        | com_ty @ CommentObjectType::Sink { name }
7534        | com_ty @ CommentObjectType::Secret { name } => {
7535            let item = scx.get_item_by_resolved_name(name)?;
7536            match (com_ty, item.item_type()) {
7537                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7538                    (CommentObjectId::Table(item.id()), None)
7539                }
7540                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7541                    (CommentObjectId::View(item.id()), None)
7542                }
7543                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7544                    (CommentObjectId::MaterializedView(item.id()), None)
7545                }
7546                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7547                    (CommentObjectId::Index(item.id()), None)
7548                }
7549                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7550                    (CommentObjectId::Func(item.id()), None)
7551                }
7552                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7553                    (CommentObjectId::Connection(item.id()), None)
7554                }
7555                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7556                    (CommentObjectId::Source(item.id()), None)
7557                }
7558                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7559                    (CommentObjectId::Sink(item.id()), None)
7560                }
7561                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7562                    (CommentObjectId::Secret(item.id()), None)
7563                }
7564                (com_ty, cat_ty) => {
7565                    let expected_type = match com_ty {
7566                        CommentObjectType::Table { .. } => ObjectType::Table,
7567                        CommentObjectType::View { .. } => ObjectType::View,
7568                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7569                        CommentObjectType::Index { .. } => ObjectType::Index,
7570                        CommentObjectType::Func { .. } => ObjectType::Func,
7571                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7572                        CommentObjectType::Source { .. } => ObjectType::Source,
7573                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7574                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7575                        _ => sql_bail!("cannot comment on this object type"),
7576                    };
7577
7578                    return Err(PlanError::InvalidObjectType {
7579                        expected_type: SystemObjectType::Object(expected_type),
7580                        actual_type: SystemObjectType::Object(cat_ty.into()),
7581                        object_name: item.name().item.clone(),
7582                    });
7583                }
7584            }
7585        }
7586        CommentObjectType::Type { ty } => match ty {
7587            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7588                sql_bail!("cannot comment on anonymous list or map type");
7589            }
7590            ResolvedDataType::Named { id, modifiers, .. } => {
7591                if !modifiers.is_empty() {
7592                    sql_bail!("cannot comment on type with modifiers");
7593                }
7594                (CommentObjectId::Type(*id), None)
7595            }
7596            ResolvedDataType::Error => bail_internal!("unresolved data type"),
7597        },
7598        CommentObjectType::Column { name } => {
7599            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7600            match item.item_type() {
7601                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7602                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7603                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7604                CatalogItemType::MaterializedView => {
7605                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7606                }
7607                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7608                r => {
7609                    return Err(PlanError::Unsupported {
7610                        feature: format!("Specifying comments on a column of {r}"),
7611                        discussion_no: None,
7612                    });
7613                }
7614            }
7615        }
7616        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7617        CommentObjectType::Database { name } => {
7618            (CommentObjectId::Database(*name.database_id()), None)
7619        }
7620        CommentObjectType::Schema { name } => {
7621            // Temporary schemas cannot have comments - they are connection-specific
7622            // and transient. With lazy temporary schema creation, the temp schema
7623            // may not exist yet, but we still need to return the correct error.
7624            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7625                sql_bail!(
7626                    "cannot comment on schema {} because it is a temporary schema",
7627                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7628                );
7629            }
7630            (
7631                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7632                None,
7633            )
7634        }
7635        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7636        CommentObjectType::ClusterReplica { name } => {
7637            let replica = scx.catalog.resolve_cluster_replica(name)?;
7638            (
7639                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7640                None,
7641            )
7642        }
7643        CommentObjectType::NetworkPolicy { name } => {
7644            (CommentObjectId::NetworkPolicy(name.id), None)
7645        }
7646    };
7647
7648    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7649    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7650    // it's the easiest place to raise an error.
7651    //
7652    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7653    if let Some(p) = column_pos {
7654        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7655            max_num_columns: MAX_NUM_COLUMNS,
7656            req_num_columns: p,
7657        })?;
7658    }
7659
7660    Ok(Plan::Comment(CommentPlan {
7661        object_id,
7662        sub_component: column_pos,
7663        comment,
7664    }))
7665}
7666
7667pub(crate) fn resolve_cluster<'a>(
7668    scx: &'a StatementContext,
7669    name: &'a Ident,
7670    if_exists: bool,
7671) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7672    match scx.resolve_cluster(Some(name)) {
7673        Ok(cluster) => Ok(Some(cluster)),
7674        Err(_) if if_exists => Ok(None),
7675        Err(e) => Err(e),
7676    }
7677}
7678
7679pub(crate) fn resolve_cluster_replica<'a>(
7680    scx: &'a StatementContext,
7681    name: &QualifiedReplica,
7682    if_exists: bool,
7683) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7684    match scx.resolve_cluster(Some(&name.cluster)) {
7685        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7686            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7687            None if if_exists => Ok(None),
7688            None => Err(sql_err!(
7689                "CLUSTER {} has no CLUSTER REPLICA named {}",
7690                cluster.name(),
7691                name.replica.as_str().quoted(),
7692            )),
7693        },
7694        Err(_) if if_exists => Ok(None),
7695        Err(e) => Err(e),
7696    }
7697}
7698
7699pub(crate) fn resolve_database<'a>(
7700    scx: &'a StatementContext,
7701    name: &'a UnresolvedDatabaseName,
7702    if_exists: bool,
7703) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7704    match scx.resolve_database(name) {
7705        Ok(database) => Ok(Some(database)),
7706        Err(_) if if_exists => Ok(None),
7707        Err(e) => Err(e),
7708    }
7709}
7710
7711pub(crate) fn resolve_schema<'a>(
7712    scx: &'a StatementContext,
7713    name: UnresolvedSchemaName,
7714    if_exists: bool,
7715) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7716    match scx.resolve_schema(name) {
7717        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7718        Err(_) if if_exists => Ok(None),
7719        Err(e) => Err(e),
7720    }
7721}
7722
7723pub(crate) fn resolve_network_policy<'a>(
7724    scx: &'a StatementContext,
7725    name: Ident,
7726    if_exists: bool,
7727) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7728    match scx.catalog.resolve_network_policy(&name.to_string()) {
7729        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7730            id: policy.id(),
7731            name: policy.name().to_string(),
7732        })),
7733        Err(_) if if_exists => Ok(None),
7734        Err(e) => Err(e.into()),
7735    }
7736}
7737
7738pub(crate) fn resolve_item_or_type<'a>(
7739    scx: &'a StatementContext,
7740    object_type: ObjectType,
7741    name: UnresolvedItemName,
7742    if_exists: bool,
7743) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7744    let name = normalize::unresolved_item_name(name)?;
7745    let catalog_item = match object_type {
7746        ObjectType::Type => scx.catalog.resolve_type(&name),
7747        ObjectType::Table
7748        | ObjectType::View
7749        | ObjectType::MaterializedView
7750        | ObjectType::Source
7751        | ObjectType::Sink
7752        | ObjectType::Index
7753        | ObjectType::Role
7754        | ObjectType::Cluster
7755        | ObjectType::ClusterReplica
7756        | ObjectType::Secret
7757        | ObjectType::Connection
7758        | ObjectType::Database
7759        | ObjectType::Schema
7760        | ObjectType::Func
7761        | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7762    };
7763
7764    match catalog_item {
7765        Ok(item) => {
7766            let is_type = ObjectType::from(item.item_type());
7767            if object_type == is_type {
7768                Ok(Some(item))
7769            } else {
7770                Err(PlanError::MismatchedObjectType {
7771                    name: scx.catalog.minimal_qualification(item.name()),
7772                    is_type,
7773                    expected_type: object_type,
7774                })
7775            }
7776        }
7777        Err(_) if if_exists => Ok(None),
7778        Err(e) => Err(e.into()),
7779    }
7780}
7781
7782/// Returns an error if the given cluster is a managed cluster
7783fn ensure_cluster_is_not_managed(
7784    scx: &StatementContext,
7785    cluster_id: ClusterId,
7786) -> Result<(), PlanError> {
7787    let cluster = scx.catalog.get_cluster(cluster_id);
7788    if cluster.is_managed() {
7789        Err(PlanError::ManagedCluster {
7790            cluster_name: cluster.name().to_string(),
7791        })
7792    } else {
7793        Ok(())
7794    }
7795}