Skip to main content

mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59    CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60    CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61    CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141    scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154    AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155    ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156    ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157    CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158    CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165    WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184/// Given a relation desc and a column list, checks that:
185/// - the column list is a prefix of the desc;
186/// - all the listed columns are types that have meaningful Persist-level ordering.
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188    if partition_by.len() > desc.len() {
189        tracing::error!(
190            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191            desc.len(),
192            partition_by.len()
193        );
194        partition_by.truncate(desc.len());
195    }
196
197    let desc_prefix = desc.iter().take(partition_by.len());
198    for (idx, ((desc_name, desc_type), partition_name)) in
199        desc_prefix.zip_eq(partition_by).enumerate()
200    {
201        let partition_name = normalize::column_name(partition_name);
202        if *desc_name != partition_name {
203            sql_bail!(
204                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205            );
206        }
207        if !preserves_order(&desc_type.scalar_type) {
208            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209        }
210    }
211    Ok(())
212}
213
214pub fn describe_create_database(
215    _: &StatementContext,
216    _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218    Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222    _: &StatementContext,
223    CreateDatabaseStatement {
224        name,
225        if_not_exists,
226    }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228    Ok(Plan::CreateDatabase(CreateDatabasePlan {
229        name: normalize::ident(name.0),
230        if_not_exists,
231    }))
232}
233
234pub fn describe_create_schema(
235    _: &StatementContext,
236    _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238    Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242    scx: &StatementContext,
243    CreateSchemaStatement {
244        mut name,
245        if_not_exists,
246    }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248    if name.0.len() > 2 {
249        sql_bail!("schema name {} has more than two components", name);
250    }
251    let schema_name = normalize::ident(
252        name.0
253            .pop()
254            .expect("names always have at least one component"),
255    );
256    let database_spec = match name.0.pop() {
257        None => match scx.catalog.active_database() {
258            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259            None => sql_bail!("no database specified and no active database"),
260        },
261        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263            Err(_) => sql_bail!("invalid database {}", n.as_str()),
264        },
265    };
266    Ok(Plan::CreateSchema(CreateSchemaPlan {
267        database_spec,
268        schema_name,
269        if_not_exists,
270    }))
271}
272
273pub fn describe_create_table(
274    _: &StatementContext,
275    _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277    Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281    scx: &StatementContext,
282    stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284    let CreateTableStatement {
285        name,
286        columns,
287        constraints,
288        if_not_exists,
289        temporary,
290        with_options,
291    } = &stmt;
292
293    let names: Vec<_> = columns
294        .iter()
295        .filter(|c| {
296            // This set of `names` is used to create the initial RelationDesc.
297            // Columns that have been added at later versions of the table will
298            // get added further below.
299            let is_versioned = c
300                .options
301                .iter()
302                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303            !is_versioned
304        })
305        .map(|c| normalize::column_name(c.name.clone()))
306        .collect();
307
308    if let Some(dup) = names.iter().duplicates().next() {
309        sql_bail!("column {} specified more than once", dup.quoted());
310    }
311
312    // Build initial relation type that handles declared data types
313    // and NOT NULL constraints.
314    let mut column_types = Vec::with_capacity(columns.len());
315    let mut defaults = Vec::with_capacity(columns.len());
316    let mut changes = BTreeMap::new();
317    let mut keys = Vec::new();
318
319    for (i, c) in columns.into_iter().enumerate() {
320        let aug_data_type = &c.data_type;
321        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322        let mut nullable = true;
323        let mut default = Expr::null();
324        let mut versioned = false;
325        for option in &c.options {
326            match &option.option {
327                ColumnOption::NotNull => nullable = false,
328                ColumnOption::Default(expr) => {
329                    // Ensure expression can be planned and yields the correct
330                    // type.
331                    let mut expr = expr.clone();
332                    transform_ast::transform(scx, &mut expr)?;
333                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
334                    default = expr.clone();
335                }
336                ColumnOption::Unique { is_primary } => {
337                    keys.push(vec![i]);
338                    if *is_primary {
339                        nullable = false;
340                    }
341                }
342                ColumnOption::Versioned { action, version } => {
343                    let version = RelationVersion::from(*version);
344                    versioned = true;
345
346                    let name = normalize::column_name(c.name.clone());
347                    let typ = ty.clone().nullable(nullable);
348
349                    changes.insert(version, (action.clone(), name, typ));
350                }
351                other => {
352                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353                }
354            }
355        }
356        // TODO(alter_table): This assumes all versioned columns are at the
357        // end. This will no longer be true when we support dropping columns.
358        if !versioned {
359            column_types.push(ty.nullable(nullable));
360        }
361        defaults.push(default);
362    }
363
364    let mut seen_primary = false;
365    'c: for constraint in constraints {
366        match constraint {
367            TableConstraint::Unique {
368                name: _,
369                columns,
370                is_primary,
371                nulls_not_distinct,
372            } => {
373                if seen_primary && *is_primary {
374                    sql_bail!(
375                        "multiple primary keys for table {} are not allowed",
376                        name.to_ast_string_stable()
377                    );
378                }
379                seen_primary = *is_primary || seen_primary;
380
381                let mut key = vec![];
382                for column in columns {
383                    let column = normalize::column_name(column.clone());
384                    match names.iter().position(|name| *name == column) {
385                        None => sql_bail!("unknown column in constraint: {}", column),
386                        Some(i) => {
387                            let nullable = &mut column_types[i].nullable;
388                            if *is_primary {
389                                if *nulls_not_distinct {
390                                    sql_bail!(
391                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392                                    );
393                                }
394
395                                *nullable = false;
396                            } else if !(*nulls_not_distinct || !*nullable) {
397                                // Non-primary key unique constraints are only keys if all of their
398                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
399                                break 'c;
400                            }
401
402                            key.push(i);
403                        }
404                    }
405                }
406
407                if *is_primary {
408                    keys.insert(0, key);
409                } else {
410                    keys.push(key);
411                }
412            }
413            TableConstraint::ForeignKey { .. } => {
414                // Foreign key constraints are not presently enforced. We allow
415                // them with feature flags for sqllogictest's sake.
416                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417            }
418            TableConstraint::Check { .. } => {
419                // Check constraints are not presently enforced. We allow them
420                // with feature flags for sqllogictest's sake.
421                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422            }
423        }
424    }
425
426    if !keys.is_empty() {
427        // Unique constraints are not presently enforced. We allow them with feature flags for
428        // sqllogictest's sake.
429        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430    }
431
432    let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434    let temporary = *temporary;
435    let name = if temporary {
436        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437    } else {
438        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    };
440
441    // Check for an object in the catalog with this same name
442    let full_name = scx.catalog.resolve_full_name(&name);
443    let partial_name = PartialItemName::from(full_name.clone());
444    // For PostgreSQL compatibility, we need to prevent creating tables when
445    // there is an existing object *or* type of the same name.
446    if let (false, Ok(item)) = (
447        if_not_exists,
448        scx.catalog.resolve_item_or_type(&partial_name),
449    ) {
450        return Err(PlanError::ItemAlreadyExists {
451            name: full_name.to_string(),
452            item_type: item.item_type(),
453        });
454    }
455
456    let desc = RelationDesc::new(typ, names);
457    let mut desc = VersionedRelationDesc::new(desc);
458    for (version, (_action, name, typ)) in changes.into_iter() {
459        let new_version = desc.add_column(name, typ);
460        if version != new_version {
461            return Err(PlanError::InvalidTable {
462                name: full_name.item,
463            });
464        }
465    }
466
467    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469    // Table options should only consider the original columns, since those
470    // were the only ones in scope when the table was created.
471    //
472    // TODO(alter_table): Will need to reconsider this when we support ALTERing
473    // the PARTITION BY columns.
474    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477    let compaction_window = options.iter().find_map(|o| {
478        #[allow(irrefutable_let_patterns)]
479        if let crate::plan::TableOption::RetainHistory(lcw) = o {
480            Some(lcw.clone())
481        } else {
482            None
483        }
484    });
485
486    let table = Table {
487        create_sql,
488        desc,
489        temporary,
490        compaction_window,
491        data_source: TableDataSource::TableWrites { defaults },
492    };
493    Ok(Plan::CreateTable(CreateTablePlan {
494        name,
495        table,
496        if_not_exists: *if_not_exists,
497    }))
498}
499
500pub fn describe_create_table_from_source(
501    _: &StatementContext,
502    _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504    Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508    _: &StatementContext,
509    _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511    Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515    _: &StatementContext,
516    _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518    Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522    _: &StatementContext,
523    _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525    Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529    CreateSourceOption,
530    (TimestampInterval, Duration),
531    (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535    PgConfigOption,
536    (Details, String),
537    (Publication, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543    MySqlConfigOption,
544    (Details, String),
545    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550    SqlServerConfigOption,
551    (Details, String),
552    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557    scx: &StatementContext,
558    mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560    if stmt.is_table {
561        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562    }
563
564    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
565    // we plan to normalize when we canonicalize the create statement.
566    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567    let enable_multi_replica_sources =
568        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569    if !enable_multi_replica_sources {
570        if in_cluster.replica_ids().len() > 1 {
571            sql_bail!("cannot create webhook source in cluster with more than one replica")
572        }
573    }
574    let create_sql =
575        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577    let CreateWebhookSourceStatement {
578        name,
579        if_not_exists,
580        body_format,
581        include_headers,
582        validate_using,
583        is_table,
584        // We resolved `in_cluster` above, so we want to ignore it here.
585        in_cluster: _,
586    } = stmt;
587
588    let validate_using = validate_using
589        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590        .transpose()?;
591    if let Some(WebhookValidation { expression, .. }) = &validate_using {
592        // If the validation expression doesn't reference any part of the request, then we should
593        // return an error because it's almost definitely wrong.
594        if !expression.contains_column() {
595            return Err(PlanError::WebhookValidationDoesNotUseColumns);
596        }
597        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
598        // allow calls to `now()` because some webhook providers recommend rejecting requests that
599        // are older than a certain threshold.
600        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601            return Err(PlanError::WebhookValidationNonDeterministic);
602        }
603    }
604
605    let body_format = match body_format {
606        Format::Bytes => WebhookBodyFormat::Bytes,
607        Format::Json { array } => WebhookBodyFormat::Json { array },
608        Format::Text => WebhookBodyFormat::Text,
609        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
610        ty => {
611            return Err(PlanError::Unsupported {
612                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613                discussion_no: None,
614            });
615        }
616    };
617
618    let mut column_ty = vec![
619        // Always include the body of the request as the first column.
620        SqlColumnType {
621            scalar_type: SqlScalarType::from(body_format),
622            nullable: false,
623        },
624    ];
625    let mut column_names = vec!["body".to_string()];
626
627    let mut headers = WebhookHeaders::default();
628
629    // Include a `headers` column, possibly filtered.
630    if let Some(filters) = include_headers.column {
631        column_ty.push(SqlColumnType {
632            scalar_type: SqlScalarType::Map {
633                value_type: Box::new(SqlScalarType::String),
634                custom_id: None,
635            },
636            nullable: false,
637        });
638        column_names.push("headers".to_string());
639
640        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641            filters.into_iter().partition_map(|filter| {
642                if filter.block {
643                    itertools::Either::Right(filter.header_name)
644                } else {
645                    itertools::Either::Left(filter.header_name)
646                }
647            });
648        headers.header_column = Some(WebhookHeaderFilters { allow, block });
649    }
650
651    // Map headers to specific columns.
652    for header in include_headers.mappings {
653        let scalar_type = header
654            .use_bytes
655            .then_some(SqlScalarType::Bytes)
656            .unwrap_or(SqlScalarType::String);
657        column_ty.push(SqlColumnType {
658            scalar_type,
659            nullable: true,
660        });
661        column_names.push(header.column_name.into_string());
662
663        let column_idx = column_ty.len() - 1;
664        // Double check we're consistent with column names.
665        assert_eq!(
666            column_idx,
667            column_names.len() - 1,
668            "header column names and types don't match"
669        );
670        headers
671            .mapped_headers
672            .insert(column_idx, (header.header_name, header.use_bytes));
673    }
674
675    // Validate our columns.
676    let mut unique_check = HashSet::with_capacity(column_names.len());
677    for name in &column_names {
678        if !unique_check.insert(name) {
679            return Err(PlanError::AmbiguousColumn(name.clone().into()));
680        }
681    }
682    if column_names.len() > MAX_NUM_COLUMNS {
683        return Err(PlanError::TooManyColumns {
684            max_num_columns: MAX_NUM_COLUMNS,
685            req_num_columns: column_names.len(),
686        });
687    }
688
689    let typ = SqlRelationType::new(column_ty);
690    let desc = RelationDesc::new(typ, column_names);
691
692    // Check for an object in the catalog with this same name
693    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694    let full_name = scx.catalog.resolve_full_name(&name);
695    let partial_name = PartialItemName::from(full_name.clone());
696    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697        return Err(PlanError::ItemAlreadyExists {
698            name: full_name.to_string(),
699            item_type: item.item_type(),
700        });
701    }
702
703    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
704    // such, we always use a default of EpochMilliseconds.
705    let timeline = Timeline::EpochMilliseconds;
706
707    let plan = if is_table {
708        let data_source = DataSourceDesc::Webhook {
709            validate_using,
710            body_format,
711            headers,
712            cluster_id: Some(in_cluster.id()),
713        };
714        let data_source = TableDataSource::DataSource {
715            desc: data_source,
716            timeline,
717        };
718        Plan::CreateTable(CreateTablePlan {
719            name,
720            if_not_exists,
721            table: Table {
722                create_sql,
723                desc: VersionedRelationDesc::new(desc),
724                temporary: false,
725                compaction_window: None,
726                data_source,
727            },
728        })
729    } else {
730        let data_source = DataSourceDesc::Webhook {
731            validate_using,
732            body_format,
733            headers,
734            // Important: The cluster is set at the `Source` level.
735            cluster_id: None,
736        };
737        Plan::CreateSource(CreateSourcePlan {
738            name,
739            source: Source {
740                create_sql,
741                data_source,
742                desc,
743                compaction_window: None,
744            },
745            if_not_exists,
746            timeline,
747            in_cluster: Some(in_cluster.id()),
748        })
749    };
750
751    Ok(plan)
752}
753
754pub fn plan_create_source(
755    scx: &StatementContext,
756    mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758    let CreateSourceStatement {
759        name,
760        in_cluster: _,
761        col_names,
762        connection: source_connection,
763        envelope,
764        if_not_exists,
765        format,
766        key_constraint,
767        include_metadata,
768        with_options,
769        external_references: referenced_subsources,
770        progress_subsource,
771    } = &stmt;
772
773    mz_ore::soft_assert_or_log!(
774        referenced_subsources.is_none(),
775        "referenced subsources must be cleared in purification"
776    );
777
778    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779        && scx.catalog.system_vars().force_source_table_syntax();
780
781    // If the new source table syntax is forced all the options related to the primary
782    // source output should be un-set.
783    if force_source_table_syntax {
784        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785            Err(PlanError::UseTablesForSources(
786                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787            ))?;
788        }
789    }
790
791    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794        && include_metadata
795            .iter()
796            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797    {
798        // TODO(guswynn): should this be `bail_unsupported!`?
799        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800    }
801    if !matches!(
802        source_connection,
803        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804    ) && !include_metadata.is_empty()
805    {
806        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807    }
808
809    if !include_metadata.is_empty()
810        && !matches!(
811            envelope,
812            ast::SourceEnvelope::Upsert { .. }
813                | ast::SourceEnvelope::None
814                | ast::SourceEnvelope::Debezium
815        )
816    {
817        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818    }
819
820    let external_connection =
821        plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823    let CreateSourceOptionExtracted {
824        timestamp_interval,
825        retain_history,
826        seen: _,
827    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829    let metadata_columns_desc = match external_connection {
830        GenericSourceConnection::Kafka(KafkaSourceConnection {
831            ref metadata_columns,
832            ..
833        }) => kafka_metadata_columns_desc(metadata_columns),
834        _ => vec![],
835    };
836
837    // Generate the relation description for the primary export of the source.
838    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839        scx,
840        &envelope,
841        format,
842        Some(external_connection.default_key_desc()),
843        external_connection.default_value_desc(),
844        include_metadata,
845        metadata_columns_desc,
846        &external_connection,
847    )?;
848    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850    let names: Vec<_> = desc.iter_names().cloned().collect();
851    if let Some(dup) = names.iter().duplicates().next() {
852        sql_bail!("column {} specified more than once", dup.quoted());
853    }
854
855    // Apply user-specified key constraint
856    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857        // Don't remove this without addressing
858        // https://github.com/MaterializeInc/database-issues/issues/4371.
859        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861        let key_columns = columns
862            .into_iter()
863            .map(normalize::column_name)
864            .collect::<Vec<_>>();
865
866        let mut uniq = BTreeSet::new();
867        for col in key_columns.iter() {
868            if !uniq.insert(col) {
869                sql_bail!("Repeated column name in source key constraint: {}", col);
870            }
871        }
872
873        let key_indices = key_columns
874            .iter()
875            .map(|col| {
876                let name_idx = desc
877                    .get_by_name(col)
878                    .map(|(idx, _type)| idx)
879                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880                if desc.get_unambiguous_name(name_idx).is_none() {
881                    sql_bail!("Ambiguous column in source key constraint: {}", col);
882                }
883                Ok(name_idx)
884            })
885            .collect::<Result<Vec<_>, _>>()?;
886
887        if !desc.typ().keys.is_empty() {
888            return Err(key_constraint_err(&desc, &key_columns));
889        } else {
890            desc = desc.with_key(key_indices);
891        }
892    }
893
894    let timestamp_interval = match timestamp_interval {
895        Some(duration) => {
896            let min = scx.catalog.system_vars().min_timestamp_interval();
897            let max = scx.catalog.system_vars().max_timestamp_interval();
898            if duration < min || duration > max {
899                return Err(PlanError::InvalidTimestampInterval {
900                    min,
901                    max,
902                    requested: duration,
903                });
904            }
905            duration
906        }
907        None => scx.catalog.config().timestamp_interval,
908    };
909
910    let (desc, data_source) = match progress_subsource {
911        Some(name) => {
912            let DeferredItemName::Named(name) = name else {
913                sql_bail!("[internal error] progress subsource must be named during purification");
914            };
915            let ResolvedItemName::Item { id, .. } = name else {
916                sql_bail!("[internal error] invalid target id");
917            };
918
919            let details = match external_connection {
920                GenericSourceConnection::Kafka(ref c) => {
921                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
922                        metadata_columns: c.metadata_columns.clone(),
923                    })
924                }
925                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926                    LoadGenerator::Auction
927                    | LoadGenerator::Marketing
928                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929                    LoadGenerator::Counter { .. }
930                    | LoadGenerator::Clock
931                    | LoadGenerator::Datums
932                    | LoadGenerator::KeyValue(_) => {
933                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934                            output: LoadGeneratorOutput::Default,
935                        })
936                    }
937                },
938                GenericSourceConnection::Postgres(_)
939                | GenericSourceConnection::MySql(_)
940                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941            };
942
943            let data_source = DataSourceDesc::OldSyntaxIngestion {
944                desc: SourceDesc {
945                    connection: external_connection,
946                    timestamp_interval,
947                },
948                progress_subsource: *id,
949                data_config: SourceExportDataConfig {
950                    encoding,
951                    envelope: envelope.clone(),
952                },
953                details,
954            };
955            (desc, data_source)
956        }
957        None => {
958            let desc = external_connection.timestamp_desc();
959            let data_source = DataSourceDesc::Ingestion(SourceDesc {
960                connection: external_connection,
961                timestamp_interval,
962            });
963            (desc, data_source)
964        }
965    };
966
967    let if_not_exists = *if_not_exists;
968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970    // Check for an object in the catalog with this same name
971    let full_name = scx.catalog.resolve_full_name(&name);
972    let partial_name = PartialItemName::from(full_name.clone());
973    // For PostgreSQL compatibility, we need to prevent creating sources when
974    // there is an existing object *or* type of the same name.
975    if let (false, Ok(item)) = (
976        if_not_exists,
977        scx.catalog.resolve_item_or_type(&partial_name),
978    ) {
979        return Err(PlanError::ItemAlreadyExists {
980            name: full_name.to_string(),
981            item_type: item.item_type(),
982        });
983    }
984
985    // We will rewrite the cluster if one is not provided, so we must use the
986    // `in_cluster` value we plan to normalize when we canonicalize the create
987    // statement.
988    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992    // Determine a default timeline for the source.
993    let timeline = match envelope {
994        SourceEnvelope::CdcV2 => {
995            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996        }
997        _ => Timeline::EpochMilliseconds,
998    };
999
1000    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002    let source = Source {
1003        create_sql,
1004        data_source,
1005        desc,
1006        compaction_window,
1007    };
1008
1009    Ok(Plan::CreateSource(CreateSourcePlan {
1010        name,
1011        source,
1012        if_not_exists,
1013        timeline,
1014        in_cluster: Some(in_cluster.id()),
1015    }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019    scx: &StatementContext<'_>,
1020    source_connection: &CreateSourceConnection<Aug>,
1021    include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023    Ok(match source_connection {
1024        CreateSourceConnection::Kafka {
1025            connection,
1026            options,
1027        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028            scx,
1029            connection,
1030            options,
1031            include_metadata,
1032        )?),
1033        CreateSourceConnection::Postgres {
1034            connection,
1035            options,
1036        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037            scx, connection, options,
1038        )?),
1039        CreateSourceConnection::SqlServer {
1040            connection,
1041            options,
1042        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043            scx, connection, options,
1044        )?),
1045        CreateSourceConnection::MySql {
1046            connection,
1047            options,
1048        } => {
1049            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050        }
1051        CreateSourceConnection::LoadGenerator { generator, options } => {
1052            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053                scx,
1054                generator,
1055                options,
1056                include_metadata,
1057            )?)
1058        }
1059    })
1060}
1061
1062fn plan_load_generator_source_connection(
1063    scx: &StatementContext<'_>,
1064    generator: &ast::LoadGenerator,
1065    options: &Vec<LoadGeneratorOption<Aug>>,
1066    include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068    let load_generator =
1069        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070    let LoadGeneratorOptionExtracted {
1071        tick_interval,
1072        as_of,
1073        up_to,
1074        ..
1075    } = options.clone().try_into()?;
1076    let tick_micros = match tick_interval {
1077        Some(interval) => Some(interval.as_micros().try_into()?),
1078        None => None,
1079    };
1080    if up_to < as_of {
1081        sql_bail!("UP TO cannot be less than AS OF");
1082    }
1083    Ok(LoadGeneratorSourceConnection {
1084        load_generator,
1085        tick_micros,
1086        as_of,
1087        up_to,
1088    })
1089}
1090
1091fn plan_mysql_source_connection(
1092    scx: &StatementContext<'_>,
1093    connection: &ResolvedItemName,
1094    options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096    let connection_item = scx.get_item_by_resolved_name(connection)?;
1097    match connection_item.connection()? {
1098        Connection::MySql(connection) => connection,
1099        _ => sql_bail!(
1100            "{} is not a MySQL connection",
1101            scx.catalog.resolve_full_name(connection_item.name())
1102        ),
1103    };
1104    let MySqlConfigOptionExtracted {
1105        details,
1106        // text/exclude columns are already part of the source-exports and are only included
1107        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1108        // be removed once we drop support for implicitly created subsources.
1109        text_columns: _,
1110        exclude_columns: _,
1111        seen: _,
1112    } = options.clone().try_into()?;
1113    let details = details
1114        .as_ref()
1115        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119    Ok(MySqlSourceConnection {
1120        connection: connection_item.id(),
1121        connection_id: connection_item.id(),
1122        details,
1123    })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127    scx: &StatementContext<'_>,
1128    connection: &ResolvedItemName,
1129    options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131    let connection_item = scx.get_item_by_resolved_name(connection)?;
1132    match connection_item.connection()? {
1133        Connection::SqlServer(connection) => connection,
1134        _ => sql_bail!(
1135            "{} is not a SQL Server connection",
1136            scx.catalog.resolve_full_name(connection_item.name())
1137        ),
1138    };
1139    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140    let details = details
1141        .as_ref()
1142        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143    let extras = hex::decode(details)
1144        .map_err(|e| sql_err!("{e}"))
1145        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147    Ok(SqlServerSourceConnection {
1148        connection_id: connection_item.id(),
1149        connection: connection_item.id(),
1150        extras,
1151    })
1152}
1153
1154fn plan_postgres_source_connection(
1155    scx: &StatementContext<'_>,
1156    connection: &ResolvedItemName,
1157    options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159    let connection_item = scx.get_item_by_resolved_name(connection)?;
1160    let PgConfigOptionExtracted {
1161        details,
1162        publication,
1163        // text columns are already part of the source-exports and are only included
1164        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1165        // be removed once we drop support for implicitly created subsources.
1166        text_columns: _,
1167        // exclude columns are already part of the source-exports and are only included
1168        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1169        // be removed once we drop support for implicitly created subsources.
1170        exclude_columns: _,
1171        seen: _,
1172    } = options.clone().try_into()?;
1173    let details = details
1174        .as_ref()
1175        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177    let details =
1178        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179    let publication_details =
1180        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181    Ok(PostgresSourceConnection {
1182        connection: connection_item.id(),
1183        connection_id: connection_item.id(),
1184        publication: publication.expect("validated exists during purification"),
1185        publication_details,
1186    })
1187}
1188
1189fn plan_kafka_source_connection(
1190    scx: &StatementContext<'_>,
1191    connection_name: &ResolvedItemName,
1192    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193    include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197        sql_bail!(
1198            "{} is not a kafka connection",
1199            scx.catalog.resolve_full_name(connection_item.name())
1200        )
1201    }
1202    let KafkaSourceConfigOptionExtracted {
1203        group_id_prefix,
1204        topic,
1205        topic_metadata_refresh_interval,
1206        start_timestamp: _, // purified into `start_offset`
1207        start_offset,
1208        seen: _,
1209    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210    let topic = topic.expect("validated exists during purification");
1211    let mut start_offsets = BTreeMap::new();
1212    if let Some(offsets) = start_offset {
1213        for (part, offset) in offsets.iter().enumerate() {
1214            if *offset < 0 {
1215                sql_bail!("START OFFSET must be a nonnegative integer");
1216            }
1217            start_offsets.insert(i32::try_from(part)?, *offset);
1218        }
1219    }
1220    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221        // This is a librdkafka-enforced restriction that, if violated,
1222        // would result in a runtime error for the source.
1223        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224    }
1225    let metadata_columns = include_metadata
1226        .into_iter()
1227        .flat_map(|item| match item {
1228            SourceIncludeMetadata::Timestamp { alias } => {
1229                let name = match alias {
1230                    Some(name) => name.to_string(),
1231                    None => "timestamp".to_owned(),
1232                };
1233                Some((name, KafkaMetadataKind::Timestamp))
1234            }
1235            SourceIncludeMetadata::Partition { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "partition".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Partition))
1241            }
1242            SourceIncludeMetadata::Offset { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "offset".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Offset))
1248            }
1249            SourceIncludeMetadata::Headers { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "headers".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Headers))
1255            }
1256            SourceIncludeMetadata::Header {
1257                alias,
1258                key,
1259                use_bytes,
1260            } => Some((
1261                alias.to_string(),
1262                KafkaMetadataKind::Header {
1263                    key: key.clone(),
1264                    use_bytes: *use_bytes,
1265                },
1266            )),
1267            SourceIncludeMetadata::Key { .. } => {
1268                // handled below
1269                None
1270            }
1271        })
1272        .collect();
1273    Ok(KafkaSourceConnection {
1274        connection: connection_item.id(),
1275        connection_id: connection_item.id(),
1276        topic,
1277        start_offsets,
1278        group_id_prefix,
1279        topic_metadata_refresh_interval,
1280        metadata_columns,
1281    })
1282}
1283
1284fn apply_source_envelope_encoding(
1285    scx: &StatementContext,
1286    envelope: &ast::SourceEnvelope,
1287    format: &Option<FormatSpecifier<Aug>>,
1288    key_desc: Option<RelationDesc>,
1289    value_desc: RelationDesc,
1290    include_metadata: &[SourceIncludeMetadata],
1291    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292    source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294    (
1295        RelationDesc,
1296        SourceEnvelope,
1297        Option<SourceDataEncoding<ReferencedConnection>>,
1298    ),
1299    PlanError,
1300> {
1301    let encoding = match format {
1302        Some(format) => Some(get_encoding(scx, format, envelope)?),
1303        None => None,
1304    };
1305
1306    let (key_desc, value_desc) = match &encoding {
1307        Some(encoding) => {
1308            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1309            // single column of type bytes.
1310            match value_desc.typ().columns() {
1311                [typ] => match typ.scalar_type {
1312                    SqlScalarType::Bytes => {}
1313                    _ => sql_bail!(
1314                        "The schema produced by the source is incompatible with format decoding"
1315                    ),
1316                },
1317                _ => sql_bail!(
1318                    "The schema produced by the source is incompatible with format decoding"
1319                ),
1320            }
1321
1322            let (key_desc, value_desc) = encoding.desc()?;
1323
1324            // TODO(petrosagg): This piece of code seems to be making a statement about the
1325            // nullability of the NONE envelope when the source is Kafka. As written, the code
1326            // misses opportunities to mark columns as not nullable and is over conservative. For
1327            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1328            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1329            // should be replaced with precise type-level reasoning.
1330            let key_desc = key_desc.map(|desc| {
1331                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333                if is_kafka && is_envelope_none {
1334                    RelationDesc::from_names_and_types(
1335                        desc.into_iter()
1336                            .map(|(name, typ)| (name, typ.nullable(true))),
1337                    )
1338                } else {
1339                    desc
1340                }
1341            });
1342            (key_desc, value_desc)
1343        }
1344        None => (key_desc, value_desc),
1345    };
1346
1347    // KEY VALUE load generators are the only UPSERT source that
1348    // has no encoding but defaults to `INCLUDE KEY`.
1349    //
1350    // As discussed
1351    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1352    // removing this special case amounts to deciding how to handle null keys
1353    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1354    // this special case in.
1355    //
1356    // Note that this is safe because this generator is
1357    // 1. The only source with no encoding that can have its key included.
1358    // 2. Never produces null keys (or values, for that matter).
1359    let key_envelope_no_encoding = matches!(
1360        source_connection,
1361        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362            load_generator: LoadGenerator::KeyValue(_),
1363            ..
1364        })
1365    );
1366    let mut key_envelope = get_key_envelope(
1367        include_metadata,
1368        encoding.as_ref(),
1369        key_envelope_no_encoding,
1370    )?;
1371
1372    match (&envelope, &key_envelope) {
1373        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376        ),
1377        _ => {}
1378    };
1379
1380    // Not all source envelopes are compatible with all source connections.
1381    // Whoever constructs the source ingestion pipeline is responsible for
1382    // choosing compatible envelopes and connections.
1383    //
1384    // TODO(guswynn): ambiguously assert which connections and envelopes are
1385    // compatible in typechecking
1386    //
1387    // TODO: remove bails as more support for upsert is added.
1388    let envelope = match &envelope {
1389        // TODO: fixup key envelope
1390        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391        ast::SourceEnvelope::Debezium => {
1392            //TODO check that key envelope is not set
1393            let after_idx = match typecheck_debezium(&value_desc) {
1394                Ok((_before_idx, after_idx)) => Ok(after_idx),
1395                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396                    Some(DataEncoding::Avro(_)) => Err(type_err),
1397                    _ => Err(sql_err!(
1398                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399                    )),
1400                },
1401            }?;
1402
1403            UnplannedSourceEnvelope::Upsert {
1404                style: UpsertStyle::Debezium { after_idx },
1405            }
1406        }
1407        ast::SourceEnvelope::Upsert {
1408            value_decode_err_policy,
1409        } => {
1410            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411                None => {
1412                    if !key_envelope_no_encoding {
1413                        bail_unsupported!(format!(
1414                            "UPSERT requires a key/value format: {:?}",
1415                            format
1416                        ))
1417                    }
1418                    None
1419                }
1420                Some(key_encoding) => Some(key_encoding),
1421            };
1422            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1423            // specified.
1424            if key_envelope == KeyEnvelope::None {
1425                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426            }
1427            // If the value decode error policy is not set we use the default upsert style.
1428            let style = match value_decode_err_policy.as_slice() {
1429                [] => UpsertStyle::Default(key_envelope),
1430                [SourceErrorPolicy::Inline { alias }] => {
1431                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432                    UpsertStyle::ValueErrInline {
1433                        key_envelope,
1434                        error_column: alias
1435                            .as_ref()
1436                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437                    }
1438                }
1439                _ => {
1440                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441                }
1442            };
1443
1444            UnplannedSourceEnvelope::Upsert { style }
1445        }
1446        ast::SourceEnvelope::CdcV2 => {
1447            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448            //TODO check that key envelope is not set
1449            match format {
1450                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452            }
1453            UnplannedSourceEnvelope::CdcV2
1454        }
1455    };
1456
1457    let metadata_desc = included_column_desc(metadata_columns_desc);
1458    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460    Ok((desc, envelope, encoding))
1461}
1462
1463/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1464/// of columns and constraints.
1465fn plan_source_export_desc(
1466    scx: &StatementContext,
1467    name: &UnresolvedItemName,
1468    columns: &Vec<ColumnDef<Aug>>,
1469    constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471    let names: Vec<_> = columns
1472        .iter()
1473        .map(|c| normalize::column_name(c.name.clone()))
1474        .collect();
1475
1476    if let Some(dup) = names.iter().duplicates().next() {
1477        sql_bail!("column {} specified more than once", dup.quoted());
1478    }
1479
1480    // Build initial relation type that handles declared data types
1481    // and NOT NULL constraints.
1482    let mut column_types = Vec::with_capacity(columns.len());
1483    let mut keys = Vec::new();
1484
1485    for (i, c) in columns.into_iter().enumerate() {
1486        let aug_data_type = &c.data_type;
1487        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488        let mut nullable = true;
1489        for option in &c.options {
1490            match &option.option {
1491                ColumnOption::NotNull => nullable = false,
1492                ColumnOption::Default(_) => {
1493                    bail_unsupported!("Source export with default value")
1494                }
1495                ColumnOption::Unique { is_primary } => {
1496                    keys.push(vec![i]);
1497                    if *is_primary {
1498                        nullable = false;
1499                    }
1500                }
1501                other => {
1502                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1503                }
1504            }
1505        }
1506        column_types.push(ty.nullable(nullable));
1507    }
1508
1509    let mut seen_primary = false;
1510    'c: for constraint in constraints {
1511        match constraint {
1512            TableConstraint::Unique {
1513                name: _,
1514                columns,
1515                is_primary,
1516                nulls_not_distinct,
1517            } => {
1518                if seen_primary && *is_primary {
1519                    sql_bail!(
1520                        "multiple primary keys for source export {} are not allowed",
1521                        name.to_ast_string_stable()
1522                    );
1523                }
1524                seen_primary = *is_primary || seen_primary;
1525
1526                let mut key = vec![];
1527                for column in columns {
1528                    let column = normalize::column_name(column.clone());
1529                    match names.iter().position(|name| *name == column) {
1530                        None => sql_bail!("unknown column in constraint: {}", column),
1531                        Some(i) => {
1532                            let nullable = &mut column_types[i].nullable;
1533                            if *is_primary {
1534                                if *nulls_not_distinct {
1535                                    sql_bail!(
1536                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537                                    );
1538                                }
1539                                *nullable = false;
1540                            } else if !(*nulls_not_distinct || !*nullable) {
1541                                // Non-primary key unique constraints are only keys if all of their
1542                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1543                                break 'c;
1544                            }
1545
1546                            key.push(i);
1547                        }
1548                    }
1549                }
1550
1551                if *is_primary {
1552                    keys.insert(0, key);
1553                } else {
1554                    keys.push(key);
1555                }
1556            }
1557            TableConstraint::ForeignKey { .. } => {
1558                bail_unsupported!("Source export with a foreign key")
1559            }
1560            TableConstraint::Check { .. } => {
1561                bail_unsupported!("Source export with a check constraint")
1562            }
1563        }
1564    }
1565
1566    let typ = SqlRelationType::new(column_types).with_keys(keys);
1567    let desc = RelationDesc::new(typ, names);
1568    Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572    CreateSubsourceOption,
1573    (Progress, bool, Default(false)),
1574    (ExternalReference, UnresolvedItemName),
1575    (RetainHistory, OptionalDuration),
1576    (TextColumns, Vec::<Ident>, Default(vec![])),
1577    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578    (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582    scx: &StatementContext,
1583    stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585    let CreateSubsourceStatement {
1586        name,
1587        columns,
1588        of_source,
1589        constraints,
1590        if_not_exists,
1591        with_options,
1592    } = &stmt;
1593
1594    let CreateSubsourceOptionExtracted {
1595        progress,
1596        retain_history,
1597        external_reference,
1598        text_columns,
1599        exclude_columns,
1600        details,
1601        seen: _,
1602    } = with_options.clone().try_into()?;
1603
1604    // This invariant is enforced during purification; we are responsible for
1605    // creating the AST for subsources as a response to CREATE SOURCE
1606    // statements, so this would fire in integration testing if we failed to
1607    // uphold it.
1608    assert!(
1609        progress ^ (external_reference.is_some() && of_source.is_some()),
1610        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611    );
1612
1613    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615    let data_source = if let Some(source_reference) = of_source {
1616        // If the new source table syntax is forced we should not be creating any non-progress
1617        // subsources.
1618        if scx.catalog.system_vars().enable_create_table_from_source()
1619            && scx.catalog.system_vars().force_source_table_syntax()
1620        {
1621            Err(PlanError::UseTablesForSources(
1622                "CREATE SUBSOURCE".to_string(),
1623            ))?;
1624        }
1625
1626        // This is a subsource with the "natural" dependency order, i.e. it is
1627        // not a legacy subsource with the inverted structure.
1628        let ingestion_id = *source_reference.item_id();
1629        let external_reference = external_reference.unwrap();
1630
1631        // Decode the details option stored on the subsource statement, which contains information
1632        // created during the purification process.
1633        let details = details
1634            .as_ref()
1635            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637        let details =
1638            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639        let details =
1640            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641        let details = match details {
1642            SourceExportStatementDetails::Postgres { table } => {
1643                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644                    column_casts: crate::pure::postgres::generate_column_casts(
1645                        scx,
1646                        &table,
1647                        &text_columns,
1648                    )?,
1649                    table,
1650                })
1651            }
1652            SourceExportStatementDetails::MySql {
1653                table,
1654                initial_gtid_set,
1655            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656                table,
1657                initial_gtid_set,
1658                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659                exclude_columns: exclude_columns
1660                    .into_iter()
1661                    .map(|c| c.into_string())
1662                    .collect(),
1663            }),
1664            SourceExportStatementDetails::SqlServer {
1665                table,
1666                capture_instance,
1667                initial_lsn,
1668            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669                capture_instance,
1670                table,
1671                initial_lsn,
1672                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673                exclude_columns: exclude_columns
1674                    .into_iter()
1675                    .map(|c| c.into_string())
1676                    .collect(),
1677            }),
1678            SourceExportStatementDetails::LoadGenerator { output } => {
1679                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680            }
1681            SourceExportStatementDetails::Kafka {} => {
1682                bail_unsupported!("subsources cannot reference Kafka sources")
1683            }
1684        };
1685        DataSourceDesc::IngestionExport {
1686            ingestion_id,
1687            external_reference,
1688            details,
1689            // Subsources don't currently support non-default envelopes / encoding
1690            data_config: SourceExportDataConfig {
1691                envelope: SourceEnvelope::None(NoneEnvelope {
1692                    key_envelope: KeyEnvelope::None,
1693                    key_arity: 0,
1694                }),
1695                encoding: None,
1696            },
1697        }
1698    } else if progress {
1699        DataSourceDesc::Progress
1700    } else {
1701        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702    };
1703
1704    let if_not_exists = *if_not_exists;
1705    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710    let source = Source {
1711        create_sql,
1712        data_source,
1713        desc,
1714        compaction_window,
1715    };
1716
1717    Ok(Plan::CreateSource(CreateSourcePlan {
1718        name,
1719        source,
1720        if_not_exists,
1721        timeline: Timeline::EpochMilliseconds,
1722        in_cluster: None,
1723    }))
1724}
1725
1726generate_extracted_config!(
1727    TableFromSourceOption,
1728    (TextColumns, Vec::<Ident>, Default(vec![])),
1729    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730    (PartitionBy, Vec<Ident>),
1731    (RetainHistory, OptionalDuration),
1732    (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736    scx: &StatementContext,
1737    stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739    if !scx.catalog.system_vars().enable_create_table_from_source() {
1740        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741    }
1742
1743    let CreateTableFromSourceStatement {
1744        name,
1745        columns,
1746        constraints,
1747        if_not_exists,
1748        source,
1749        external_reference,
1750        envelope,
1751        format,
1752        include_metadata,
1753        with_options,
1754    } = &stmt;
1755
1756    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758    let TableFromSourceOptionExtracted {
1759        text_columns,
1760        exclude_columns,
1761        retain_history,
1762        partition_by,
1763        details,
1764        seen: _,
1765    } = with_options.clone().try_into()?;
1766
1767    let source_item = scx.get_item_by_resolved_name(source)?;
1768    let ingestion_id = source_item.id();
1769
1770    // Decode the details option stored on the statement, which contains information
1771    // created during the purification process.
1772    let details = details
1773        .as_ref()
1774        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776    let details =
1777        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778    let details =
1779        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782        && include_metadata
1783            .iter()
1784            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785    {
1786        // TODO(guswynn): should this be `bail_unsupported!`?
1787        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788    }
1789    if !matches!(
1790        details,
1791        SourceExportStatementDetails::Kafka { .. }
1792            | SourceExportStatementDetails::LoadGenerator { .. }
1793    ) && !include_metadata.is_empty()
1794    {
1795        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796    }
1797
1798    let details = match details {
1799        SourceExportStatementDetails::Postgres { table } => {
1800            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801                column_casts: crate::pure::postgres::generate_column_casts(
1802                    scx,
1803                    &table,
1804                    &text_columns,
1805                )?,
1806                table,
1807            })
1808        }
1809        SourceExportStatementDetails::MySql {
1810            table,
1811            initial_gtid_set,
1812        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813            table,
1814            initial_gtid_set,
1815            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816            exclude_columns: exclude_columns
1817                .into_iter()
1818                .map(|c| c.into_string())
1819                .collect(),
1820        }),
1821        SourceExportStatementDetails::SqlServer {
1822            table,
1823            capture_instance,
1824            initial_lsn,
1825        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826            table,
1827            capture_instance,
1828            initial_lsn,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834        }),
1835        SourceExportStatementDetails::LoadGenerator { output } => {
1836            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837        }
1838        SourceExportStatementDetails::Kafka {} => {
1839            if !include_metadata.is_empty()
1840                && !matches!(
1841                    envelope,
1842                    ast::SourceEnvelope::Upsert { .. }
1843                        | ast::SourceEnvelope::None
1844                        | ast::SourceEnvelope::Debezium
1845                )
1846            {
1847                // TODO(guswynn): should this be `bail_unsupported!`?
1848                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849            }
1850
1851            let metadata_columns = include_metadata
1852                .into_iter()
1853                .flat_map(|item| match item {
1854                    SourceIncludeMetadata::Timestamp { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "timestamp".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Timestamp))
1860                    }
1861                    SourceIncludeMetadata::Partition { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "partition".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Partition))
1867                    }
1868                    SourceIncludeMetadata::Offset { alias } => {
1869                        let name = match alias {
1870                            Some(name) => name.to_string(),
1871                            None => "offset".to_owned(),
1872                        };
1873                        Some((name, KafkaMetadataKind::Offset))
1874                    }
1875                    SourceIncludeMetadata::Headers { alias } => {
1876                        let name = match alias {
1877                            Some(name) => name.to_string(),
1878                            None => "headers".to_owned(),
1879                        };
1880                        Some((name, KafkaMetadataKind::Headers))
1881                    }
1882                    SourceIncludeMetadata::Header {
1883                        alias,
1884                        key,
1885                        use_bytes,
1886                    } => Some((
1887                        alias.to_string(),
1888                        KafkaMetadataKind::Header {
1889                            key: key.clone(),
1890                            use_bytes: *use_bytes,
1891                        },
1892                    )),
1893                    SourceIncludeMetadata::Key { .. } => {
1894                        // handled below
1895                        None
1896                    }
1897                })
1898                .collect();
1899
1900            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901        }
1902    };
1903
1904    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1907    // during purification and define the `columns` and `constraints` fields for the statement,
1908    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1909    // we use the source connection's default schema.
1910    let (key_desc, value_desc) =
1911        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912            let columns = match columns {
1913                TableFromSourceColumns::Defined(columns) => columns,
1914                _ => unreachable!(),
1915            };
1916            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917            (None, desc)
1918        } else {
1919            let key_desc = source_connection.default_key_desc();
1920            let value_desc = source_connection.default_value_desc();
1921            (Some(key_desc), value_desc)
1922        };
1923
1924    let metadata_columns_desc = match &details {
1925        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926            metadata_columns, ..
1927        }) => kafka_metadata_columns_desc(metadata_columns),
1928        _ => vec![],
1929    };
1930
1931    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932        scx,
1933        &envelope,
1934        format,
1935        key_desc,
1936        value_desc,
1937        include_metadata,
1938        metadata_columns_desc,
1939        source_connection,
1940    )?;
1941    if let TableFromSourceColumns::Named(col_names) = columns {
1942        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943    }
1944
1945    let names: Vec<_> = desc.iter_names().cloned().collect();
1946    if let Some(dup) = names.iter().duplicates().next() {
1947        sql_bail!("column {} specified more than once", dup.quoted());
1948    }
1949
1950    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952    // Allow users to specify a timeline. If they do not, determine a default
1953    // timeline for the source.
1954    let timeline = match envelope {
1955        SourceEnvelope::CdcV2 => {
1956            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957        }
1958        _ => Timeline::EpochMilliseconds,
1959    };
1960
1961    if let Some(partition_by) = partition_by {
1962        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963        check_partition_by(&desc, partition_by)?;
1964    }
1965
1966    let data_source = DataSourceDesc::IngestionExport {
1967        ingestion_id,
1968        external_reference: external_reference
1969            .as_ref()
1970            .expect("populated in purification")
1971            .clone(),
1972        details,
1973        data_config: SourceExportDataConfig { envelope, encoding },
1974    };
1975
1976    let if_not_exists = *if_not_exists;
1977
1978    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981    let table = Table {
1982        create_sql,
1983        desc: VersionedRelationDesc::new(desc),
1984        temporary: false,
1985        compaction_window,
1986        data_source: TableDataSource::DataSource {
1987            desc: data_source,
1988            timeline,
1989        },
1990    };
1991
1992    Ok(Plan::CreateTable(CreateTablePlan {
1993        name,
1994        table,
1995        if_not_exists,
1996    }))
1997}
1998
1999generate_extracted_config!(
2000    LoadGeneratorOption,
2001    (TickInterval, Duration),
2002    (AsOf, u64, Default(0_u64)),
2003    (UpTo, u64, Default(u64::MAX)),
2004    (ScaleFactor, f64),
2005    (MaxCardinality, u64),
2006    (Keys, u64),
2007    (SnapshotRounds, u64),
2008    (TransactionalSnapshot, bool),
2009    (ValueSize, u64),
2010    (Seed, u64),
2011    (Partitions, u64),
2012    (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016    pub(super) fn ensure_only_valid_options(
2017        &self,
2018        loadgen: &ast::LoadGenerator,
2019    ) -> Result<(), PlanError> {
2020        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022        let mut options = self.seen.clone();
2023
2024        let permitted_options: &[_] = match loadgen {
2025            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031            ast::LoadGenerator::KeyValue => &[
2032                TickInterval,
2033                Keys,
2034                SnapshotRounds,
2035                TransactionalSnapshot,
2036                ValueSize,
2037                Seed,
2038                Partitions,
2039                BatchSize,
2040            ],
2041        };
2042
2043        for o in permitted_options {
2044            options.remove(o);
2045        }
2046
2047        if !options.is_empty() {
2048            sql_bail!(
2049                "{} load generators do not support {} values",
2050                loadgen,
2051                options.iter().join(", ")
2052            )
2053        }
2054
2055        Ok(())
2056    }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060    scx: &StatementContext,
2061    loadgen: &ast::LoadGenerator,
2062    options: &[LoadGeneratorOption<Aug>],
2063    include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066    extracted.ensure_only_valid_options(loadgen)?;
2067
2068    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070    }
2071
2072    let load_generator = match loadgen {
2073        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074        ast::LoadGenerator::Clock => {
2075            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076            LoadGenerator::Clock
2077        }
2078        ast::LoadGenerator::Counter => {
2079            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080            let LoadGeneratorOptionExtracted {
2081                max_cardinality, ..
2082            } = extracted;
2083            LoadGenerator::Counter { max_cardinality }
2084        }
2085        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086        ast::LoadGenerator::Datums => {
2087            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088            LoadGenerator::Datums
2089        }
2090        ast::LoadGenerator::Tpch => {
2091            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093            // Default to 0.01 scale factor (=10MB).
2094            let sf: f64 = scale_factor.unwrap_or(0.01);
2095            if !sf.is_finite() || sf < 0.0 {
2096                sql_bail!("unsupported scale factor {sf}");
2097            }
2098
2099            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100                let total = (sf * multiplier).floor();
2101                let mut i = i64::try_cast_from(total)
2102                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103                if i < 1 {
2104                    i = 1;
2105                }
2106                Ok(i)
2107            };
2108
2109            // The multiplications here are safely unchecked because they will
2110            // overflow to infinity, which will be caught by f64_to_i64.
2111            let count_supplier = f_to_i(10_000f64)?;
2112            let count_part = f_to_i(200_000f64)?;
2113            let count_customer = f_to_i(150_000f64)?;
2114            let count_orders = f_to_i(150_000f64 * 10f64)?;
2115            let count_clerk = f_to_i(1_000f64)?;
2116
2117            LoadGenerator::Tpch {
2118                count_supplier,
2119                count_part,
2120                count_customer,
2121                count_orders,
2122                count_clerk,
2123            }
2124        }
2125        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127            let LoadGeneratorOptionExtracted {
2128                keys,
2129                snapshot_rounds,
2130                transactional_snapshot,
2131                value_size,
2132                tick_interval,
2133                seed,
2134                partitions,
2135                batch_size,
2136                ..
2137            } = extracted;
2138
2139            let mut include_offset = None;
2140            for im in include_metadata {
2141                match im {
2142                    SourceIncludeMetadata::Offset { alias } => {
2143                        include_offset = match alias {
2144                            Some(alias) => Some(alias.to_string()),
2145                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146                        }
2147                    }
2148                    SourceIncludeMetadata::Key { .. } => continue,
2149
2150                    _ => {
2151                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152                    }
2153                };
2154            }
2155
2156            let lgkv = KeyValueLoadGenerator {
2157                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158                snapshot_rounds: snapshot_rounds
2159                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160                // Defaults to true.
2161                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162                value_size: value_size
2163                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164                partitions: partitions
2165                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166                tick_interval,
2167                batch_size: batch_size
2168                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170                include_offset,
2171            };
2172
2173            if lgkv.keys == 0
2174                || lgkv.partitions == 0
2175                || lgkv.value_size == 0
2176                || lgkv.batch_size == 0
2177            {
2178                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179            }
2180
2181            if lgkv.keys % lgkv.partitions != 0 {
2182                sql_bail!("KEYS must be a multiple of PARTITIONS")
2183            }
2184
2185            if lgkv.batch_size > lgkv.keys {
2186                sql_bail!("KEYS must be larger than BATCH SIZE")
2187            }
2188
2189            // This constraints simplifies the source implementation.
2190            // We can lift it later.
2191            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193            }
2194
2195            if lgkv.snapshot_rounds == 0 {
2196                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197            }
2198
2199            LoadGenerator::KeyValue(lgkv)
2200        }
2201    };
2202
2203    Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207    let before = value_desc.get_by_name(&"before".into());
2208    let (after_idx, after_ty) = value_desc
2209        .get_by_name(&"after".into())
2210        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211    let before_idx = if let Some((before_idx, before_ty)) = before {
2212        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213            sql_bail!("'before' column must be of type record");
2214        }
2215        if before_ty != after_ty {
2216            sql_bail!("'before' type differs from 'after' column");
2217        }
2218        Some(before_idx)
2219    } else {
2220        None
2221    };
2222    Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226    scx: &StatementContext,
2227    format: &FormatSpecifier<Aug>,
2228    envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230    let encoding = match format {
2231        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232        FormatSpecifier::KeyValue { key, value } => {
2233            let key = {
2234                let encoding = get_encoding_inner(scx, key)?;
2235                Some(encoding.key.unwrap_or(encoding.value))
2236            };
2237            let value = get_encoding_inner(scx, value)?.value;
2238            SourceDataEncoding { key, value }
2239        }
2240    };
2241
2242    let requires_keyvalue = matches!(
2243        envelope,
2244        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245    );
2246    let is_keyvalue = encoding.key.is_some();
2247    if requires_keyvalue && !is_keyvalue {
2248        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249    };
2250
2251    Ok(encoding)
2252}
2253
2254/// Determine the cluster ID to use for this item.
2255///
2256/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2257/// Because of this, do not normalize/canonicalize the create SQL statement
2258/// until after calling this function.
2259fn source_sink_cluster_config<'a, 'ctx>(
2260    scx: &'a StatementContext<'ctx>,
2261    in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263    let cluster = match in_cluster {
2264        None => {
2265            let cluster = scx.catalog.resolve_cluster(None)?;
2266            *in_cluster = Some(ResolvedClusterName {
2267                id: cluster.id(),
2268                print_name: None,
2269            });
2270            cluster
2271        }
2272        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273    };
2274
2275    Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282    pub key_schema: Option<String>,
2283    pub value_schema: String,
2284    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285    pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289    scx: &StatementContext,
2290    format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292    let value = match format {
2293        Format::Bytes => DataEncoding::Bytes,
2294        Format::Avro(schema) => {
2295            let Schema {
2296                key_schema,
2297                value_schema,
2298                csr_connection,
2299                confluent_wire_format,
2300            } = match schema {
2301                // TODO(jldlaughlin): we need a way to pass in primary key information
2302                // when building a source from a string or file.
2303                AvroSchema::InlineSchema {
2304                    schema: ast::Schema { schema },
2305                    with_options,
2306                } => {
2307                    let AvroSchemaOptionExtracted {
2308                        confluent_wire_format,
2309                        ..
2310                    } = with_options.clone().try_into()?;
2311
2312                    Schema {
2313                        key_schema: None,
2314                        value_schema: schema.clone(),
2315                        csr_connection: None,
2316                        confluent_wire_format,
2317                    }
2318                }
2319                AvroSchema::Csr {
2320                    csr_connection:
2321                        CsrConnectionAvro {
2322                            connection,
2323                            seed,
2324                            key_strategy: _,
2325                            value_strategy: _,
2326                        },
2327                } => {
2328                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329                    let csr_connection = match item.connection()? {
2330                        Connection::Csr(_) => item.id(),
2331                        _ => {
2332                            sql_bail!(
2333                                "{} is not a schema registry connection",
2334                                scx.catalog
2335                                    .resolve_full_name(item.name())
2336                                    .to_string()
2337                                    .quoted()
2338                            )
2339                        }
2340                    };
2341
2342                    if let Some(seed) = seed {
2343                        Schema {
2344                            key_schema: seed.key_schema.clone(),
2345                            value_schema: seed.value_schema.clone(),
2346                            csr_connection: Some(csr_connection),
2347                            confluent_wire_format: true,
2348                        }
2349                    } else {
2350                        unreachable!("CSR seed resolution should already have been called: Avro")
2351                    }
2352                }
2353            };
2354
2355            if let Some(key_schema) = key_schema {
2356                return Ok(SourceDataEncoding {
2357                    key: Some(DataEncoding::Avro(AvroEncoding {
2358                        schema: key_schema,
2359                        csr_connection: csr_connection.clone(),
2360                        confluent_wire_format,
2361                    })),
2362                    value: DataEncoding::Avro(AvroEncoding {
2363                        schema: value_schema,
2364                        csr_connection,
2365                        confluent_wire_format,
2366                    }),
2367                });
2368            } else {
2369                DataEncoding::Avro(AvroEncoding {
2370                    schema: value_schema,
2371                    csr_connection,
2372                    confluent_wire_format,
2373                })
2374            }
2375        }
2376        Format::Protobuf(schema) => match schema {
2377            ProtobufSchema::Csr {
2378                csr_connection:
2379                    CsrConnectionProtobuf {
2380                        connection:
2381                            CsrConnection {
2382                                connection,
2383                                options,
2384                            },
2385                        seed,
2386                    },
2387            } => {
2388                if let Some(CsrSeedProtobuf { key, value }) = seed {
2389                    let item = scx.get_item_by_resolved_name(connection)?;
2390                    let _ = match item.connection()? {
2391                        Connection::Csr(connection) => connection,
2392                        _ => {
2393                            sql_bail!(
2394                                "{} is not a schema registry connection",
2395                                scx.catalog
2396                                    .resolve_full_name(item.name())
2397                                    .to_string()
2398                                    .quoted()
2399                            )
2400                        }
2401                    };
2402
2403                    if !options.is_empty() {
2404                        sql_bail!("Protobuf CSR connections do not support any options");
2405                    }
2406
2407                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2408                        descriptors: strconv::parse_bytes(&value.schema)?,
2409                        message_name: value.message_name.clone(),
2410                        confluent_wire_format: true,
2411                    });
2412                    if let Some(key) = key {
2413                        return Ok(SourceDataEncoding {
2414                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415                                descriptors: strconv::parse_bytes(&key.schema)?,
2416                                message_name: key.message_name.clone(),
2417                                confluent_wire_format: true,
2418                            })),
2419                            value,
2420                        });
2421                    }
2422                    value
2423                } else {
2424                    unreachable!("CSR seed resolution should already have been called: Proto")
2425                }
2426            }
2427            ProtobufSchema::InlineSchema {
2428                message_name,
2429                schema: ast::Schema { schema },
2430            } => {
2431                let descriptors = strconv::parse_bytes(schema)?;
2432
2433                DataEncoding::Protobuf(ProtobufEncoding {
2434                    descriptors,
2435                    message_name: message_name.to_owned(),
2436                    confluent_wire_format: false,
2437                })
2438            }
2439        },
2440        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441            regex: mz_repr::adt::regex::Regex::new(regex, false)
2442                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443        }),
2444        Format::Csv { columns, delimiter } => {
2445            let columns = match columns {
2446                CsvColumns::Header { names } => {
2447                    if names.is_empty() {
2448                        sql_bail!("[internal error] column spec should get names in purify")
2449                    }
2450                    ColumnSpec::Header {
2451                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452                    }
2453                }
2454                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455            };
2456            DataEncoding::Csv(CsvEncoding {
2457                columns,
2458                delimiter: u8::try_from(*delimiter)
2459                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460            })
2461        }
2462        Format::Json { array: false } => DataEncoding::Json,
2463        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464        Format::Text => DataEncoding::Text,
2465    };
2466    Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469/// Extract the key envelope, if it is requested
2470fn get_key_envelope(
2471    included_items: &[SourceIncludeMetadata],
2472    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473    key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475    let key_definition = included_items
2476        .iter()
2477        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482            (Some(name), _) if key_envelope_no_encoding => {
2483                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484            }
2485            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486            (_, None) => {
2487                // `kd.alias` == `None` means `INCLUDE KEY`
2488                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2489                // These both make sense with the same error message
2490                sql_bail!(
2491                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492                        got bare FORMAT"
2493                );
2494            }
2495        }
2496    } else {
2497        Ok(KeyEnvelope::None)
2498    }
2499}
2500
2501/// Gets the key envelope for a given key encoding when no name for the key has
2502/// been requested by the user.
2503fn get_unnamed_key_envelope(
2504    key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506    // If the key is requested but comes from an unnamed type then it gets the name "key"
2507    //
2508    // Otherwise it gets the names of the columns in the type
2509    let is_composite = match key {
2510        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511        Some(
2512            DataEncoding::Avro(_)
2513            | DataEncoding::Csv(_)
2514            | DataEncoding::Protobuf(_)
2515            | DataEncoding::Regex { .. },
2516        ) => true,
2517        None => false,
2518    };
2519
2520    if is_composite {
2521        Ok(KeyEnvelope::Flattened)
2522    } else {
2523        Ok(KeyEnvelope::Named("key".to_string()))
2524    }
2525}
2526
2527pub fn describe_create_view(
2528    _: &StatementContext,
2529    _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531    Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535    scx: &StatementContext,
2536    def: &mut ViewDefinition<Aug>,
2537    temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539    let create_sql = normalize::create_statement(
2540        scx,
2541        Statement::CreateView(CreateViewStatement {
2542            if_exists: IfExistsBehavior::Error,
2543            temporary,
2544            definition: def.clone(),
2545        }),
2546    )?;
2547
2548    let ViewDefinition {
2549        name,
2550        columns,
2551        query,
2552    } = def;
2553
2554    let query::PlannedRootQuery {
2555        expr,
2556        mut desc,
2557        finishing,
2558        scope: _,
2559    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2561    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2562    // here to help with database-issues#236. However, in the meantime, there might be a better
2563    // approach to solve database-issues#236:
2564    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2565    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566        &finishing,
2567        expr.arity()
2568    ));
2569    if expr.contains_parameters()? {
2570        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571    }
2572
2573    let dependencies = expr
2574        .depends_on()
2575        .into_iter()
2576        .map(|gid| scx.catalog.resolve_item_id(&gid))
2577        .collect();
2578
2579    let name = if temporary {
2580        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581    } else {
2582        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583    };
2584
2585    plan_utils::maybe_rename_columns(
2586        format!("view {}", scx.catalog.resolve_full_name(&name)),
2587        &mut desc,
2588        columns,
2589    )?;
2590    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592    if let Some(dup) = names.iter().duplicates().next() {
2593        sql_bail!("column {} specified more than once", dup.quoted());
2594    }
2595
2596    let view = View {
2597        create_sql,
2598        expr,
2599        dependencies,
2600        column_names: names,
2601        temporary,
2602    };
2603
2604    Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608    scx: &StatementContext,
2609    mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611    let CreateViewStatement {
2612        temporary,
2613        if_exists,
2614        definition,
2615    } = &mut stmt;
2616    let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618    // Override the statement-level IfExistsBehavior with Skip if this is
2619    // explicitly requested in the PlanContext (the default is `false`).
2620    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623        let if_exists = true;
2624        let cascade = false;
2625        let maybe_item_to_drop = plan_drop_item(
2626            scx,
2627            ObjectType::View,
2628            if_exists,
2629            definition.name.clone(),
2630            cascade,
2631        )?;
2632
2633        // Check if the new View depends on the item that we would be replacing.
2634        if let Some(id) = maybe_item_to_drop {
2635            let dependencies = view.expr.depends_on();
2636            let invalid_drop = scx
2637                .get_item(&id)
2638                .global_ids()
2639                .any(|gid| dependencies.contains(&gid));
2640            if invalid_drop {
2641                let item = scx.catalog.get_item(&id);
2642                sql_bail!(
2643                    "cannot replace view {0}: depended upon by new {0} definition",
2644                    scx.catalog.resolve_full_name(item.name())
2645                );
2646            }
2647
2648            Some(id)
2649        } else {
2650            None
2651        }
2652    } else {
2653        None
2654    };
2655    let drop_ids = replace
2656        .map(|id| {
2657            scx.catalog
2658                .item_dependents(id)
2659                .into_iter()
2660                .map(|id| id.unwrap_item_id())
2661                .collect()
2662        })
2663        .unwrap_or_default();
2664
2665    validate_view_dependencies(scx, &view.dependencies.0)?;
2666
2667    // Check for an object in the catalog with this same name
2668    let full_name = scx.catalog.resolve_full_name(&name);
2669    let partial_name = PartialItemName::from(full_name.clone());
2670    // For PostgreSQL compatibility, we need to prevent creating views when
2671    // there is an existing object *or* type of the same name.
2672    if let (Ok(item), IfExistsBehavior::Error, false) = (
2673        scx.catalog.resolve_item_or_type(&partial_name),
2674        *if_exists,
2675        ignore_if_exists_errors,
2676    ) {
2677        return Err(PlanError::ItemAlreadyExists {
2678            name: full_name.to_string(),
2679            item_type: item.item_type(),
2680        });
2681    }
2682
2683    Ok(Plan::CreateView(CreateViewPlan {
2684        name,
2685        view,
2686        replace,
2687        drop_ids,
2688        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2689        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2690    }))
2691}
2692
2693/// Validate the dependencies of a (materialized) view.
2694fn validate_view_dependencies(
2695    scx: &StatementContext,
2696    dependencies: &BTreeSet<CatalogItemId>,
2697) -> Result<(), PlanError> {
2698    for id in dependencies {
2699        let item = scx.catalog.get_item(id);
2700        if item.replacement_target().is_some() {
2701            let name = scx.catalog.minimal_qualification(item.name());
2702            return Err(PlanError::InvalidDependency {
2703                name: name.to_string(),
2704                item_type: format!("replacement {}", item.item_type()),
2705            });
2706        }
2707    }
2708
2709    Ok(())
2710}
2711
2712pub fn describe_create_materialized_view(
2713    _: &StatementContext,
2714    _: CreateMaterializedViewStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716    Ok(StatementDesc::new(None))
2717}
2718
2719pub fn describe_create_continual_task(
2720    _: &StatementContext,
2721    _: CreateContinualTaskStatement<Aug>,
2722) -> Result<StatementDesc, PlanError> {
2723    Ok(StatementDesc::new(None))
2724}
2725
2726pub fn describe_create_network_policy(
2727    _: &StatementContext,
2728    _: CreateNetworkPolicyStatement<Aug>,
2729) -> Result<StatementDesc, PlanError> {
2730    Ok(StatementDesc::new(None))
2731}
2732
2733pub fn describe_alter_network_policy(
2734    _: &StatementContext,
2735    _: AlterNetworkPolicyStatement<Aug>,
2736) -> Result<StatementDesc, PlanError> {
2737    Ok(StatementDesc::new(None))
2738}
2739
2740pub fn plan_create_materialized_view(
2741    scx: &StatementContext,
2742    mut stmt: CreateMaterializedViewStatement<Aug>,
2743) -> Result<Plan, PlanError> {
2744    let cluster_id =
2745        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2746    stmt.in_cluster = Some(ResolvedClusterName {
2747        id: cluster_id,
2748        print_name: None,
2749    });
2750
2751    let create_sql =
2752        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2753
2754    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2755    let name = scx.allocate_qualified_name(partial_name.clone())?;
2756
2757    let query::PlannedRootQuery {
2758        expr,
2759        mut desc,
2760        finishing,
2761        scope: _,
2762    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2763    // We get back a trivial finishing, see comment in `plan_view`.
2764    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2765        &finishing,
2766        expr.arity()
2767    ));
2768    if expr.contains_parameters()? {
2769        return Err(PlanError::ParameterNotAllowed(
2770            "materialized views".to_string(),
2771        ));
2772    }
2773
2774    plan_utils::maybe_rename_columns(
2775        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2776        &mut desc,
2777        &stmt.columns,
2778    )?;
2779    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2780
2781    let MaterializedViewOptionExtracted {
2782        assert_not_null,
2783        partition_by,
2784        retain_history,
2785        refresh,
2786        seen: _,
2787    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2788
2789    if let Some(partition_by) = partition_by {
2790        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2791        check_partition_by(&desc, partition_by)?;
2792    }
2793
2794    let refresh_schedule = {
2795        let mut refresh_schedule = RefreshSchedule::default();
2796        let mut on_commits_seen = 0;
2797        for refresh_option_value in refresh {
2798            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2799                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2800            }
2801            match refresh_option_value {
2802                RefreshOptionValue::OnCommit => {
2803                    on_commits_seen += 1;
2804                }
2805                RefreshOptionValue::AtCreation => {
2806                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2807                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2808                }
2809                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2810                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2811                    let ecx = &ExprContext {
2812                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2813                        name: "REFRESH AT",
2814                        scope: &Scope::empty(),
2815                        relation_type: &SqlRelationType::empty(),
2816                        allow_aggregates: false,
2817                        allow_subqueries: false,
2818                        allow_parameters: false,
2819                        allow_windows: false,
2820                    };
2821                    let hir = plan_expr(ecx, &time)?.cast_to(
2822                        ecx,
2823                        CastContext::Assignment,
2824                        &SqlScalarType::MzTimestamp,
2825                    )?;
2826                    // (mz_now was purified away to a literal earlier)
2827                    let timestamp = hir
2828                        .into_literal_mz_timestamp()
2829                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2830                    refresh_schedule.ats.push(timestamp);
2831                }
2832                RefreshOptionValue::Every(RefreshEveryOptionValue {
2833                    interval,
2834                    aligned_to,
2835                }) => {
2836                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2837                    if interval.as_microseconds() <= 0 {
2838                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2839                    }
2840                    if interval.months != 0 {
2841                        // This limitation is because we want Intervals to be cleanly convertable
2842                        // to a unix epoch timestamp difference. When the interval involves months, then
2843                        // this is not true anymore, because months have variable lengths.
2844                        // See `Timestamp::round_up`.
2845                        sql_bail!("REFRESH interval must not involve units larger than days");
2846                    }
2847                    let interval = interval.duration()?;
2848                    if u64::try_from(interval.as_millis()).is_err() {
2849                        sql_bail!("REFRESH interval too large");
2850                    }
2851
2852                    let mut aligned_to = match aligned_to {
2853                        Some(aligned_to) => aligned_to,
2854                        None => {
2855                            soft_panic_or_log!(
2856                                "ALIGNED TO should have been filled in by purification"
2857                            );
2858                            sql_bail!(
2859                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2860                            )
2861                        }
2862                    };
2863
2864                    // Desugar the `aligned_to` expression
2865                    transform_ast::transform(scx, &mut aligned_to)?;
2866
2867                    let ecx = &ExprContext {
2868                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2869                        name: "REFRESH EVERY ... ALIGNED TO",
2870                        scope: &Scope::empty(),
2871                        relation_type: &SqlRelationType::empty(),
2872                        allow_aggregates: false,
2873                        allow_subqueries: false,
2874                        allow_parameters: false,
2875                        allow_windows: false,
2876                    };
2877                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2878                        ecx,
2879                        CastContext::Assignment,
2880                        &SqlScalarType::MzTimestamp,
2881                    )?;
2882                    // (mz_now was purified away to a literal earlier)
2883                    let aligned_to_const = aligned_to_hir
2884                        .into_literal_mz_timestamp()
2885                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2886
2887                    refresh_schedule.everies.push(RefreshEvery {
2888                        interval,
2889                        aligned_to: aligned_to_const,
2890                    });
2891                }
2892            }
2893        }
2894
2895        if on_commits_seen > 1 {
2896            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2897        }
2898        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2899            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2900        }
2901
2902        if refresh_schedule == RefreshSchedule::default() {
2903            None
2904        } else {
2905            Some(refresh_schedule)
2906        }
2907    };
2908
2909    let as_of = stmt.as_of.map(Timestamp::from);
2910    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2911    let mut non_null_assertions = assert_not_null
2912        .into_iter()
2913        .map(normalize::column_name)
2914        .map(|assertion_name| {
2915            column_names
2916                .iter()
2917                .position(|col| col == &assertion_name)
2918                .ok_or_else(|| {
2919                    sql_err!(
2920                        "column {} in ASSERT NOT NULL option not found",
2921                        assertion_name.quoted()
2922                    )
2923                })
2924        })
2925        .collect::<Result<Vec<_>, _>>()?;
2926    non_null_assertions.sort();
2927    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2928        let dup = &column_names[*dup];
2929        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2930    }
2931
2932    if let Some(dup) = column_names.iter().duplicates().next() {
2933        sql_bail!("column {} specified more than once", dup.quoted());
2934    }
2935
2936    // Override the statement-level IfExistsBehavior with Skip if this is
2937    // explicitly requested in the PlanContext (the default is `false`).
2938    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2939        Ok(true) => IfExistsBehavior::Skip,
2940        _ => stmt.if_exists,
2941    };
2942
2943    let mut replace = None;
2944    let mut if_not_exists = false;
2945    match if_exists {
2946        IfExistsBehavior::Replace => {
2947            let if_exists = true;
2948            let cascade = false;
2949            let replace_id = plan_drop_item(
2950                scx,
2951                ObjectType::MaterializedView,
2952                if_exists,
2953                partial_name.clone().into(),
2954                cascade,
2955            )?;
2956
2957            // Check if the new Materialized View depends on the item that we would be replacing.
2958            if let Some(id) = replace_id {
2959                let dependencies = expr.depends_on();
2960                let invalid_drop = scx
2961                    .get_item(&id)
2962                    .global_ids()
2963                    .any(|gid| dependencies.contains(&gid));
2964                if invalid_drop {
2965                    let item = scx.catalog.get_item(&id);
2966                    sql_bail!(
2967                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2968                        scx.catalog.resolve_full_name(item.name())
2969                    );
2970                }
2971                replace = Some(id);
2972            }
2973        }
2974        IfExistsBehavior::Skip => if_not_exists = true,
2975        IfExistsBehavior::Error => (),
2976    }
2977    let drop_ids = replace
2978        .map(|id| {
2979            scx.catalog
2980                .item_dependents(id)
2981                .into_iter()
2982                .map(|id| id.unwrap_item_id())
2983                .collect()
2984        })
2985        .unwrap_or_default();
2986    let mut dependencies: BTreeSet<_> = expr
2987        .depends_on()
2988        .into_iter()
2989        .map(|gid| scx.catalog.resolve_item_id(&gid))
2990        .collect();
2991
2992    // Validate the replacement target, if one is given.
2993    let mut replacement_target = None;
2994    if let Some(target_name) = &stmt.replacement_for {
2995        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2996
2997        let target = scx.get_item_by_resolved_name(target_name)?;
2998        if target.item_type() != CatalogItemType::MaterializedView {
2999            return Err(PlanError::InvalidReplacement {
3000                item_type: target.item_type(),
3001                item_name: scx.catalog.minimal_qualification(target.name()),
3002                replacement_type: CatalogItemType::MaterializedView,
3003                replacement_name: partial_name,
3004            });
3005        }
3006        if target.id().is_system() {
3007            sql_bail!(
3008                "cannot replace {} because it is required by the database system",
3009                scx.catalog.minimal_qualification(target.name()),
3010            );
3011        }
3012
3013        // Check for dependency cycles.
3014        for dependent in scx.catalog.item_dependents(target.id()) {
3015            if let ObjectId::Item(id) = dependent
3016                && dependencies.contains(&id)
3017            {
3018                sql_bail!(
3019                    "replacement would cause {} to depend on itself",
3020                    scx.catalog.minimal_qualification(target.name()),
3021                );
3022            }
3023        }
3024
3025        dependencies.insert(target.id());
3026
3027        for use_id in target.used_by() {
3028            let use_item = scx.get_item(use_id);
3029            if use_item.replacement_target() == Some(target.id()) {
3030                sql_bail!(
3031                    "cannot replace {} because it already has a replacement: {}",
3032                    scx.catalog.minimal_qualification(target.name()),
3033                    scx.catalog.minimal_qualification(use_item.name()),
3034                );
3035            }
3036        }
3037
3038        replacement_target = Some(target.id());
3039    }
3040
3041    validate_view_dependencies(scx, &dependencies)?;
3042
3043    // Check for an object in the catalog with this same name
3044    let full_name = scx.catalog.resolve_full_name(&name);
3045    let partial_name = PartialItemName::from(full_name.clone());
3046    // For PostgreSQL compatibility, we need to prevent creating materialized
3047    // views when there is an existing object *or* type of the same name.
3048    if let (IfExistsBehavior::Error, Ok(item)) =
3049        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3050    {
3051        return Err(PlanError::ItemAlreadyExists {
3052            name: full_name.to_string(),
3053            item_type: item.item_type(),
3054        });
3055    }
3056
3057    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3058        name,
3059        materialized_view: MaterializedView {
3060            create_sql,
3061            expr,
3062            dependencies: DependencyIds(dependencies),
3063            column_names,
3064            replacement_target,
3065            cluster_id,
3066            non_null_assertions,
3067            compaction_window,
3068            refresh_schedule,
3069            as_of,
3070        },
3071        replace,
3072        drop_ids,
3073        if_not_exists,
3074        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3075    }))
3076}
3077
3078generate_extracted_config!(
3079    MaterializedViewOption,
3080    (AssertNotNull, Ident, AllowMultiple),
3081    (PartitionBy, Vec<Ident>),
3082    (RetainHistory, OptionalDuration),
3083    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3084);
3085
3086pub fn plan_create_continual_task(
3087    scx: &StatementContext,
3088    mut stmt: CreateContinualTaskStatement<Aug>,
3089) -> Result<Plan, PlanError> {
3090    match &stmt.sugar {
3091        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3092        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3093            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3094        }
3095        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3096            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3097        }
3098    };
3099    let cluster_id = match &stmt.in_cluster {
3100        None => scx.catalog.resolve_cluster(None)?.id(),
3101        Some(in_cluster) => in_cluster.id,
3102    };
3103    stmt.in_cluster = Some(ResolvedClusterName {
3104        id: cluster_id,
3105        print_name: None,
3106    });
3107
3108    let create_sql =
3109        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3110
3111    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3112
3113    // It seems desirable for a CT that e.g. simply filters the input to keep
3114    // the same nullability. So, start by assuming all columns are non-nullable,
3115    // and then make them nullable below if any of the exprs plan them as
3116    // nullable.
3117    let mut desc = match stmt.columns {
3118        None => None,
3119        Some(columns) => {
3120            let mut desc_columns = Vec::with_capacity(columns.capacity());
3121            for col in columns.iter() {
3122                desc_columns.push((
3123                    normalize::column_name(col.name.clone()),
3124                    SqlColumnType {
3125                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3126                        nullable: false,
3127                    },
3128                ));
3129            }
3130            Some(RelationDesc::from_names_and_types(desc_columns))
3131        }
3132    };
3133    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3134    match input.item_type() {
3135        // Input must be a thing directly backed by a persist shard, so we can
3136        // use a persist listen to efficiently rehydrate.
3137        CatalogItemType::ContinualTask
3138        | CatalogItemType::Table
3139        | CatalogItemType::MaterializedView
3140        | CatalogItemType::Source => {}
3141        CatalogItemType::Sink
3142        | CatalogItemType::View
3143        | CatalogItemType::Index
3144        | CatalogItemType::Type
3145        | CatalogItemType::Func
3146        | CatalogItemType::Secret
3147        | CatalogItemType::Connection => {
3148            sql_bail!(
3149                "CONTINUAL TASK cannot use {} as an input",
3150                input.item_type()
3151            );
3152        }
3153    }
3154
3155    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3156    let ct_name = stmt.name;
3157    let placeholder_id = match &ct_name {
3158        ResolvedItemName::ContinualTask { id, name } => {
3159            let desc = match desc.as_ref().cloned() {
3160                Some(x) => x,
3161                None => {
3162                    // The user didn't specify the CT's columns. Take a wild
3163                    // guess that the CT has the same shape as the input. It's
3164                    // fine if this is wrong, we'll get an error below after
3165                    // planning the query.
3166                    let desc = input.relation_desc().expect("item type checked above");
3167                    desc.into_owned()
3168                }
3169            };
3170            qcx.ctes.insert(
3171                *id,
3172                CteDesc {
3173                    name: name.item.clone(),
3174                    desc,
3175                },
3176            );
3177            Some(*id)
3178        }
3179        _ => None,
3180    };
3181
3182    let mut exprs = Vec::new();
3183    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3184        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3185        let query::PlannedRootQuery {
3186            expr,
3187            desc: desc_query,
3188            finishing,
3189            scope: _,
3190        } = query::plan_ct_query(&mut qcx, query)?;
3191        // We get back a trivial finishing because we plan with a "maintained"
3192        // QueryLifetime, see comment in `plan_view`.
3193        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3194            &finishing,
3195            expr.arity()
3196        ));
3197        if expr.contains_parameters()? {
3198            if expr.contains_parameters()? {
3199                return Err(PlanError::ParameterNotAllowed(
3200                    "continual tasks".to_string(),
3201                ));
3202            }
3203        }
3204        let expr = match desc.as_mut() {
3205            None => {
3206                desc = Some(desc_query);
3207                expr
3208            }
3209            Some(desc) => {
3210                // We specify the columns for DELETE, so if any columns types don't
3211                // match, it's because it's an INSERT.
3212                if desc_query.arity() > desc.arity() {
3213                    sql_bail!(
3214                        "statement {}: INSERT has more expressions than target columns",
3215                        idx
3216                    );
3217                }
3218                if desc_query.arity() < desc.arity() {
3219                    sql_bail!(
3220                        "statement {}: INSERT has more target columns than expressions",
3221                        idx
3222                    );
3223                }
3224                // Ensure the types of the source query match the types of the target table,
3225                // installing assignment casts where necessary and possible.
3226                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3227                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3228                let expr = expr.map_err(|e| {
3229                    sql_err!(
3230                        "statement {}: column {} is of type {} but expression is of type {}",
3231                        idx,
3232                        desc.get_name(e.column).quoted(),
3233                        qcx.humanize_scalar_type(&e.target_type, false),
3234                        qcx.humanize_scalar_type(&e.source_type, false),
3235                    )
3236                })?;
3237
3238                // Update ct nullability as necessary. The `ne` above verified that the
3239                // types are the same len.
3240                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3241                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3242                if updated {
3243                    let new_types = zip_types().map(|(ct, q)| {
3244                        let mut ct = ct.clone();
3245                        if q.nullable {
3246                            ct.nullable = true;
3247                        }
3248                        ct
3249                    });
3250                    *desc = RelationDesc::from_names_and_types(
3251                        desc.iter_names().cloned().zip_eq(new_types),
3252                    );
3253                }
3254
3255                expr
3256            }
3257        };
3258        match stmt {
3259            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3260            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3261        }
3262    }
3263    // TODO(ct3): Collect things by output and assert that there is only one (or
3264    // support multiple outputs).
3265    let expr = exprs
3266        .into_iter()
3267        .reduce(|acc, expr| acc.union(expr))
3268        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3269    let dependencies = expr
3270        .depends_on()
3271        .into_iter()
3272        .map(|gid| scx.catalog.resolve_item_id(&gid))
3273        .collect();
3274
3275    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3276    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3277    if let Some(dup) = column_names.iter().duplicates().next() {
3278        sql_bail!("column {} specified more than once", dup.quoted());
3279    }
3280
3281    // Check for an object in the catalog with this same name
3282    let name = match &ct_name {
3283        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3284        ResolvedItemName::ContinualTask { name, .. } => {
3285            let name = scx.allocate_qualified_name(name.clone())?;
3286            let full_name = scx.catalog.resolve_full_name(&name);
3287            let partial_name = PartialItemName::from(full_name.clone());
3288            // For PostgreSQL compatibility, we need to prevent creating this when there
3289            // is an existing object *or* type of the same name.
3290            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3291                return Err(PlanError::ItemAlreadyExists {
3292                    name: full_name.to_string(),
3293                    item_type: item.item_type(),
3294                });
3295            }
3296            name
3297        }
3298        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3299        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3300    };
3301
3302    let as_of = stmt.as_of.map(Timestamp::from);
3303    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3304        name,
3305        placeholder_id,
3306        desc,
3307        input_id: input.global_id(),
3308        with_snapshot: snapshot.unwrap_or(true),
3309        continual_task: MaterializedView {
3310            create_sql,
3311            expr,
3312            dependencies,
3313            column_names,
3314            replacement_target: None,
3315            cluster_id,
3316            non_null_assertions: Vec::new(),
3317            compaction_window: None,
3318            refresh_schedule: None,
3319            as_of,
3320        },
3321    }))
3322}
3323
3324fn continual_task_query<'a>(
3325    ct_name: &ResolvedItemName,
3326    stmt: &'a ast::ContinualTaskStmt<Aug>,
3327) -> Option<ast::Query<Aug>> {
3328    match stmt {
3329        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3330            table_name: _,
3331            columns,
3332            source,
3333            returning,
3334        }) => {
3335            if !columns.is_empty() || !returning.is_empty() {
3336                return None;
3337            }
3338            match source {
3339                ast::InsertSource::Query(query) => Some(query.clone()),
3340                ast::InsertSource::DefaultValues => None,
3341            }
3342        }
3343        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3344            table_name: _,
3345            alias,
3346            using,
3347            selection,
3348        }) => {
3349            if !using.is_empty() {
3350                return None;
3351            }
3352            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3353            let from = ast::TableWithJoins {
3354                relation: ast::TableFactor::Table {
3355                    name: ct_name.clone(),
3356                    alias: alias.clone(),
3357                },
3358                joins: Vec::new(),
3359            };
3360            let select = ast::Select {
3361                from: vec![from],
3362                selection: selection.clone(),
3363                distinct: None,
3364                projection: vec![ast::SelectItem::Wildcard],
3365                group_by: Vec::new(),
3366                having: None,
3367                qualify: None,
3368                options: Vec::new(),
3369            };
3370            let query = ast::Query {
3371                ctes: ast::CteBlock::Simple(Vec::new()),
3372                body: ast::SetExpr::Select(Box::new(select)),
3373                order_by: Vec::new(),
3374                limit: None,
3375                offset: None,
3376            };
3377            // Then negate it to turn it into retractions (after planning it).
3378            Some(query)
3379        }
3380    }
3381}
3382
3383generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3384
3385pub fn describe_create_sink(
3386    _: &StatementContext,
3387    _: CreateSinkStatement<Aug>,
3388) -> Result<StatementDesc, PlanError> {
3389    Ok(StatementDesc::new(None))
3390}
3391
3392generate_extracted_config!(
3393    CreateSinkOption,
3394    (Snapshot, bool),
3395    (PartitionStrategy, String),
3396    (Version, u64),
3397    (CommitInterval, Duration)
3398);
3399
3400pub fn plan_create_sink(
3401    scx: &StatementContext,
3402    stmt: CreateSinkStatement<Aug>,
3403) -> Result<Plan, PlanError> {
3404    // Check for an object in the catalog with this same name
3405    let Some(name) = stmt.name.clone() else {
3406        return Err(PlanError::MissingName(CatalogItemType::Sink));
3407    };
3408    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3409    let full_name = scx.catalog.resolve_full_name(&name);
3410    let partial_name = PartialItemName::from(full_name.clone());
3411    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3412        return Err(PlanError::ItemAlreadyExists {
3413            name: full_name.to_string(),
3414            item_type: item.item_type(),
3415        });
3416    }
3417
3418    plan_sink(scx, stmt)
3419}
3420
3421/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3422/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3423/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3424/// important.
3425fn plan_sink(
3426    scx: &StatementContext,
3427    mut stmt: CreateSinkStatement<Aug>,
3428) -> Result<Plan, PlanError> {
3429    let CreateSinkStatement {
3430        name,
3431        in_cluster: _,
3432        from,
3433        connection,
3434        format,
3435        envelope,
3436        mode,
3437        if_not_exists,
3438        with_options,
3439    } = stmt.clone();
3440
3441    let Some(name) = name else {
3442        return Err(PlanError::MissingName(CatalogItemType::Sink));
3443    };
3444    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3445
3446    let envelope = match (&connection, envelope, mode) {
3447        // Kafka sinks use ENVELOPE
3448        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3449            SinkEnvelope::Upsert
3450        }
3451        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3452            SinkEnvelope::Debezium
3453        }
3454        (CreateSinkConnection::Kafka { .. }, None, None) => {
3455            sql_bail!("ENVELOPE clause is required")
3456        }
3457        (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3458            sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3459        }
3460        // Iceberg sinks use MODE
3461        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3462            SinkEnvelope::Upsert
3463        }
3464        (CreateSinkConnection::Iceberg { .. }, None, None) => {
3465            sql_bail!("MODE clause is required")
3466        }
3467        (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3468            sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3469        }
3470    };
3471
3472    let from_name = &from;
3473    let from = scx.get_item_by_resolved_name(&from)?;
3474
3475    {
3476        use CatalogItemType::*;
3477        match from.item_type() {
3478            Table | Source | MaterializedView | ContinualTask => {
3479                if from.replacement_target().is_some() {
3480                    let name = scx.catalog.minimal_qualification(from.name());
3481                    return Err(PlanError::InvalidSinkFrom {
3482                        name: name.to_string(),
3483                        item_type: format!("replacement {}", from.item_type()),
3484                    });
3485                }
3486            }
3487            Sink | View | Index | Type | Func | Secret | Connection => {
3488                let name = scx.catalog.minimal_qualification(from.name());
3489                return Err(PlanError::InvalidSinkFrom {
3490                    name: name.to_string(),
3491                    item_type: from.item_type().to_string(),
3492                });
3493            }
3494        }
3495    }
3496
3497    if from.id().is_system() {
3498        bail_unsupported!("creating a sink directly on a catalog object");
3499    }
3500
3501    let desc = from.relation_desc().expect("item type checked above");
3502    let key_indices = match &connection {
3503        CreateSinkConnection::Kafka { key: Some(key), .. }
3504        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3505            let key_columns = key
3506                .key_columns
3507                .clone()
3508                .into_iter()
3509                .map(normalize::column_name)
3510                .collect::<Vec<_>>();
3511            let mut uniq = BTreeSet::new();
3512            for col in key_columns.iter() {
3513                if !uniq.insert(col) {
3514                    sql_bail!("duplicate column referenced in KEY: {}", col);
3515                }
3516            }
3517            let indices = key_columns
3518                .iter()
3519                .map(|col| -> anyhow::Result<usize> {
3520                    let name_idx =
3521                        desc.get_by_name(col)
3522                            .map(|(idx, _type)| idx)
3523                            .ok_or_else(|| {
3524                                sql_err!("column referenced in KEY does not exist: {}", col)
3525                            })?;
3526                    if desc.get_unambiguous_name(name_idx).is_none() {
3527                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3528                    }
3529                    Ok(name_idx)
3530                })
3531                .collect::<Result<Vec<_>, _>>()?;
3532            let is_valid_key = desc
3533                .typ()
3534                .keys
3535                .iter()
3536                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3537
3538            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3539                if key.not_enforced {
3540                    scx.catalog
3541                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3542                            key: key_columns.clone(),
3543                            name: name.item.clone(),
3544                        })
3545                } else {
3546                    return Err(PlanError::UpsertSinkWithInvalidKey {
3547                        name: from_name.full_name_str(),
3548                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3549                        valid_keys: desc
3550                            .typ()
3551                            .keys
3552                            .iter()
3553                            .map(|key| {
3554                                key.iter()
3555                                    .map(|col| desc.get_name(*col).as_str().into())
3556                                    .collect()
3557                            })
3558                            .collect(),
3559                    });
3560                }
3561            }
3562            Some(indices)
3563        }
3564        CreateSinkConnection::Kafka { key: None, .. }
3565        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3566    };
3567
3568    let headers_index = match &connection {
3569        CreateSinkConnection::Kafka {
3570            headers: Some(headers),
3571            ..
3572        } => {
3573            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3574
3575            match envelope {
3576                SinkEnvelope::Upsert => (),
3577                SinkEnvelope::Debezium => {
3578                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3579                }
3580            };
3581
3582            let headers = normalize::column_name(headers.clone());
3583            let (idx, ty) = desc
3584                .get_by_name(&headers)
3585                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3586
3587            if desc.get_unambiguous_name(idx).is_none() {
3588                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3589            }
3590
3591            match &ty.scalar_type {
3592                SqlScalarType::Map { value_type, .. }
3593                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3594                _ => sql_bail!(
3595                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3596                ),
3597            }
3598
3599            Some(idx)
3600        }
3601        _ => None,
3602    };
3603
3604    // pick the first valid natural relation key, if any
3605    let relation_key_indices = desc.typ().keys.get(0).cloned();
3606
3607    let key_desc_and_indices = key_indices.map(|key_indices| {
3608        let cols = desc
3609            .iter()
3610            .map(|(name, ty)| (name.clone(), ty.clone()))
3611            .collect::<Vec<_>>();
3612        let (names, types): (Vec<_>, Vec<_>) =
3613            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3614        let typ = SqlRelationType::new(types);
3615        (RelationDesc::new(typ, names), key_indices)
3616    });
3617
3618    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3619        return Err(PlanError::UpsertSinkWithoutKey);
3620    }
3621
3622    let CreateSinkOptionExtracted {
3623        snapshot,
3624        version,
3625        partition_strategy: _,
3626        seen: _,
3627        commit_interval,
3628    } = with_options.try_into()?;
3629
3630    let connection_builder = match connection {
3631        CreateSinkConnection::Kafka {
3632            connection,
3633            options,
3634            ..
3635        } => kafka_sink_builder(
3636            scx,
3637            connection,
3638            options,
3639            format,
3640            relation_key_indices,
3641            key_desc_and_indices,
3642            headers_index,
3643            desc.into_owned(),
3644            envelope,
3645            from.id(),
3646            commit_interval,
3647        )?,
3648        CreateSinkConnection::Iceberg {
3649            connection,
3650            aws_connection,
3651            options,
3652            ..
3653        } => iceberg_sink_builder(
3654            scx,
3655            connection,
3656            aws_connection,
3657            options,
3658            relation_key_indices,
3659            key_desc_and_indices,
3660            commit_interval,
3661        )?,
3662    };
3663
3664    // WITH SNAPSHOT defaults to true
3665    let with_snapshot = snapshot.unwrap_or(true);
3666    // VERSION defaults to 0
3667    let version = version.unwrap_or(0);
3668
3669    // We will rewrite the cluster if one is not provided, so we must use the
3670    // `in_cluster` value we plan to normalize when we canonicalize the create
3671    // statement.
3672    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3673    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3674
3675    Ok(Plan::CreateSink(CreateSinkPlan {
3676        name,
3677        sink: Sink {
3678            create_sql,
3679            from: from.global_id(),
3680            connection: connection_builder,
3681            envelope,
3682            version,
3683            commit_interval,
3684        },
3685        with_snapshot,
3686        if_not_exists,
3687        in_cluster: in_cluster.id(),
3688    }))
3689}
3690
3691fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3692    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3693
3694    let existing_keys = desc
3695        .typ()
3696        .keys
3697        .iter()
3698        .map(|key_columns| {
3699            key_columns
3700                .iter()
3701                .map(|col| desc.get_name(*col).as_str())
3702                .join(", ")
3703        })
3704        .join(", ");
3705
3706    sql_err!(
3707        "Key constraint ({}) conflicts with existing key ({})",
3708        user_keys,
3709        existing_keys
3710    )
3711}
3712
3713/// Creating this by hand instead of using generate_extracted_config! macro
3714/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3715#[derive(Debug, Default, PartialEq, Clone)]
3716pub struct CsrConfigOptionExtracted {
3717    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3718    pub(crate) avro_key_fullname: Option<String>,
3719    pub(crate) avro_value_fullname: Option<String>,
3720    pub(crate) null_defaults: bool,
3721    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3722    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3723    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3724    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3725}
3726
3727impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3728    type Error = crate::plan::PlanError;
3729    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3730        let mut extracted = CsrConfigOptionExtracted::default();
3731        let mut common_doc_comments = BTreeMap::new();
3732        for option in v {
3733            if !extracted.seen.insert(option.name.clone()) {
3734                return Err(PlanError::Unstructured({
3735                    format!("{} specified more than once", option.name)
3736                }));
3737            }
3738            let option_name = option.name.clone();
3739            let option_name_str = option_name.to_ast_string_simple();
3740            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3741                option_name: option_name.to_ast_string_simple(),
3742                err: e.into(),
3743            };
3744            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3745                val.map(|s| match s {
3746                    WithOptionValue::Value(Value::String(s)) => {
3747                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3748                    }
3749                    _ => Err("must be a string".to_string()),
3750                })
3751                .transpose()
3752                .map_err(PlanError::Unstructured)
3753                .map_err(better_error)
3754            };
3755            match option.name {
3756                CsrConfigOptionName::AvroKeyFullname => {
3757                    extracted.avro_key_fullname =
3758                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3759                }
3760                CsrConfigOptionName::AvroValueFullname => {
3761                    extracted.avro_value_fullname =
3762                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3763                }
3764                CsrConfigOptionName::NullDefaults => {
3765                    extracted.null_defaults =
3766                        <bool>::try_from_value(option.value).map_err(better_error)?;
3767                }
3768                CsrConfigOptionName::AvroDocOn(doc_on) => {
3769                    let value = String::try_from_value(option.value.ok_or_else(|| {
3770                        PlanError::InvalidOptionValue {
3771                            option_name: option_name_str,
3772                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3773                        }
3774                    })?)
3775                    .map_err(better_error)?;
3776                    let key = match doc_on.identifier {
3777                        DocOnIdentifier::Column(ast::ColumnName {
3778                            relation: ResolvedItemName::Item { id, .. },
3779                            column: ResolvedColumnReference::Column { name, index: _ },
3780                        }) => DocTarget::Field {
3781                            object_id: id,
3782                            column_name: name,
3783                        },
3784                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3785                            DocTarget::Type(id)
3786                        }
3787                        _ => unreachable!(),
3788                    };
3789
3790                    match doc_on.for_schema {
3791                        DocOnSchema::KeyOnly => {
3792                            extracted.key_doc_options.insert(key, value);
3793                        }
3794                        DocOnSchema::ValueOnly => {
3795                            extracted.value_doc_options.insert(key, value);
3796                        }
3797                        DocOnSchema::All => {
3798                            common_doc_comments.insert(key, value);
3799                        }
3800                    }
3801                }
3802                CsrConfigOptionName::KeyCompatibilityLevel => {
3803                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3804                }
3805                CsrConfigOptionName::ValueCompatibilityLevel => {
3806                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3807                }
3808            }
3809        }
3810
3811        for (key, value) in common_doc_comments {
3812            if !extracted.key_doc_options.contains_key(&key) {
3813                extracted.key_doc_options.insert(key.clone(), value.clone());
3814            }
3815            if !extracted.value_doc_options.contains_key(&key) {
3816                extracted.value_doc_options.insert(key, value);
3817            }
3818        }
3819        Ok(extracted)
3820    }
3821}
3822
3823fn iceberg_sink_builder(
3824    scx: &StatementContext,
3825    catalog_connection: ResolvedItemName,
3826    aws_connection: ResolvedItemName,
3827    options: Vec<IcebergSinkConfigOption<Aug>>,
3828    relation_key_indices: Option<Vec<usize>>,
3829    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3830    commit_interval: Option<Duration>,
3831) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3832    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3833    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3834    let catalog_connection_id = catalog_connection_item.id();
3835    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3836    let aws_connection_id = aws_connection_item.id();
3837    if !matches!(
3838        catalog_connection_item.connection()?,
3839        Connection::IcebergCatalog(_)
3840    ) {
3841        sql_bail!(
3842            "{} is not an iceberg catalog connection",
3843            scx.catalog
3844                .resolve_full_name(catalog_connection_item.name())
3845                .to_string()
3846                .quoted()
3847        );
3848    };
3849
3850    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3851        sql_bail!(
3852            "{} is not an AWS connection",
3853            scx.catalog
3854                .resolve_full_name(aws_connection_item.name())
3855                .to_string()
3856                .quoted()
3857        );
3858    }
3859
3860    let IcebergSinkConfigOptionExtracted {
3861        table,
3862        namespace,
3863        seen: _,
3864    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3865
3866    let Some(table) = table else {
3867        sql_bail!("Iceberg sink must specify TABLE");
3868    };
3869    let Some(namespace) = namespace else {
3870        sql_bail!("Iceberg sink must specify NAMESPACE");
3871    };
3872    if commit_interval.is_none() {
3873        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3874    }
3875
3876    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3877        catalog_connection_id,
3878        catalog_connection: catalog_connection_id,
3879        aws_connection_id,
3880        aws_connection: aws_connection_id,
3881        table,
3882        namespace,
3883        relation_key_indices,
3884        key_desc_and_indices,
3885    }))
3886}
3887
3888fn kafka_sink_builder(
3889    scx: &StatementContext,
3890    connection: ResolvedItemName,
3891    options: Vec<KafkaSinkConfigOption<Aug>>,
3892    format: Option<FormatSpecifier<Aug>>,
3893    relation_key_indices: Option<Vec<usize>>,
3894    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3895    headers_index: Option<usize>,
3896    value_desc: RelationDesc,
3897    envelope: SinkEnvelope,
3898    sink_from: CatalogItemId,
3899    commit_interval: Option<Duration>,
3900) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3901    // Get Kafka connection.
3902    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3903    let connection_id = connection_item.id();
3904    match connection_item.connection()? {
3905        Connection::Kafka(_) => (),
3906        _ => sql_bail!(
3907            "{} is not a kafka connection",
3908            scx.catalog.resolve_full_name(connection_item.name())
3909        ),
3910    };
3911
3912    if commit_interval.is_some() {
3913        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3914    }
3915
3916    let KafkaSinkConfigOptionExtracted {
3917        topic,
3918        compression_type,
3919        partition_by,
3920        progress_group_id_prefix,
3921        transactional_id_prefix,
3922        legacy_ids,
3923        topic_config,
3924        topic_metadata_refresh_interval,
3925        topic_partition_count,
3926        topic_replication_factor,
3927        seen: _,
3928    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3929
3930    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3931        (Some(_), Some(true)) => {
3932            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3933        }
3934        (None, Some(true)) => KafkaIdStyle::Legacy,
3935        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3936    };
3937
3938    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3939        (Some(_), Some(true)) => {
3940            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3941        }
3942        (None, Some(true)) => KafkaIdStyle::Legacy,
3943        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3944    };
3945
3946    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3947
3948    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3949        // This is a librdkafka-enforced restriction that, if violated,
3950        // would result in a runtime error for the source.
3951        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3952    }
3953
3954    let assert_positive = |val: Option<i32>, name: &str| {
3955        if let Some(val) = val {
3956            if val <= 0 {
3957                sql_bail!("{} must be a positive integer", name);
3958            }
3959        }
3960        val.map(NonNeg::try_from)
3961            .transpose()
3962            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3963    };
3964    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3965    let topic_replication_factor =
3966        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3967
3968    // Helper method to parse avro connection options for format specifiers that use avro
3969    // for either key or value encoding.
3970    let gen_avro_schema_options = |conn| {
3971        let CsrConnectionAvro {
3972            connection:
3973                CsrConnection {
3974                    connection,
3975                    options,
3976                },
3977            seed,
3978            key_strategy,
3979            value_strategy,
3980        } = conn;
3981        if seed.is_some() {
3982            sql_bail!("SEED option does not make sense with sinks");
3983        }
3984        if key_strategy.is_some() {
3985            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3986        }
3987        if value_strategy.is_some() {
3988            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3989        }
3990
3991        let item = scx.get_item_by_resolved_name(&connection)?;
3992        let csr_connection = match item.connection()? {
3993            Connection::Csr(_) => item.id(),
3994            _ => {
3995                sql_bail!(
3996                    "{} is not a schema registry connection",
3997                    scx.catalog
3998                        .resolve_full_name(item.name())
3999                        .to_string()
4000                        .quoted()
4001                )
4002            }
4003        };
4004        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4005
4006        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4007            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4008        }
4009
4010        if key_desc_and_indices.is_some()
4011            && (extracted_options.avro_key_fullname.is_some()
4012                ^ extracted_options.avro_value_fullname.is_some())
4013        {
4014            sql_bail!(
4015                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4016            );
4017        }
4018
4019        Ok((csr_connection, extracted_options))
4020    };
4021
4022    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4023        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4024        Format::Bytes if desc.arity() == 1 => {
4025            let col_type = &desc.typ().column_types[0].scalar_type;
4026            if !mz_pgrepr::Value::can_encode_binary(col_type) {
4027                bail_unsupported!(format!(
4028                    "BYTES format with non-encodable type: {:?}",
4029                    col_type
4030                ));
4031            }
4032
4033            Ok(KafkaSinkFormatType::Bytes)
4034        }
4035        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4036        Format::Bytes | Format::Text => {
4037            bail_unsupported!("BYTES or TEXT format with multiple columns")
4038        }
4039        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4040        Format::Avro(AvroSchema::Csr { csr_connection }) => {
4041            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4042            let schema = if is_key {
4043                AvroSchemaGenerator::new(
4044                    desc.clone(),
4045                    false,
4046                    options.key_doc_options,
4047                    options.avro_key_fullname.as_deref().unwrap_or("row"),
4048                    options.null_defaults,
4049                    Some(sink_from),
4050                    false,
4051                )?
4052                .schema()
4053                .to_string()
4054            } else {
4055                AvroSchemaGenerator::new(
4056                    desc.clone(),
4057                    matches!(envelope, SinkEnvelope::Debezium),
4058                    options.value_doc_options,
4059                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4060                    options.null_defaults,
4061                    Some(sink_from),
4062                    true,
4063                )?
4064                .schema()
4065                .to_string()
4066            };
4067            Ok(KafkaSinkFormatType::Avro {
4068                schema,
4069                compatibility_level: if is_key {
4070                    options.key_compatibility_level
4071                } else {
4072                    options.value_compatibility_level
4073                },
4074                csr_connection,
4075            })
4076        }
4077        format => bail_unsupported!(format!("sink format {:?}", format)),
4078    };
4079
4080    let partition_by = match &partition_by {
4081        Some(partition_by) => {
4082            let mut scope = Scope::from_source(None, value_desc.iter_names());
4083
4084            match envelope {
4085                SinkEnvelope::Upsert => (),
4086                SinkEnvelope::Debezium => {
4087                    let key_indices: HashSet<_> = key_desc_and_indices
4088                        .as_ref()
4089                        .map(|(_desc, indices)| indices.as_slice())
4090                        .unwrap_or_default()
4091                        .into_iter()
4092                        .collect();
4093                    for (i, item) in scope.items.iter_mut().enumerate() {
4094                        if !key_indices.contains(&i) {
4095                            item.error_if_referenced = Some(|_table, column| {
4096                                PlanError::InvalidPartitionByEnvelopeDebezium {
4097                                    column_name: column.to_string(),
4098                                }
4099                            });
4100                        }
4101                    }
4102                }
4103            };
4104
4105            let ecx = &ExprContext {
4106                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4107                name: "PARTITION BY",
4108                scope: &scope,
4109                relation_type: value_desc.typ(),
4110                allow_aggregates: false,
4111                allow_subqueries: false,
4112                allow_parameters: false,
4113                allow_windows: false,
4114            };
4115            let expr = plan_expr(ecx, partition_by)?.cast_to(
4116                ecx,
4117                CastContext::Assignment,
4118                &SqlScalarType::UInt64,
4119            )?;
4120            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4121
4122            Some(expr)
4123        }
4124        _ => None,
4125    };
4126
4127    // Map from the format specifier of the statement to the individual key/value formats for the sink.
4128    let format = match format {
4129        Some(FormatSpecifier::KeyValue { key, value }) => {
4130            let key_format = match key_desc_and_indices.as_ref() {
4131                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4132                None => None,
4133            };
4134            KafkaSinkFormat {
4135                value_format: map_format(value, &value_desc, false)?,
4136                key_format,
4137            }
4138        }
4139        Some(FormatSpecifier::Bare(format)) => {
4140            let key_format = match key_desc_and_indices.as_ref() {
4141                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4142                None => None,
4143            };
4144            KafkaSinkFormat {
4145                value_format: map_format(format, &value_desc, false)?,
4146                key_format,
4147            }
4148        }
4149        None => bail_unsupported!("sink without format"),
4150    };
4151
4152    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4153        connection_id,
4154        connection: connection_id,
4155        format,
4156        topic: topic_name,
4157        relation_key_indices,
4158        key_desc_and_indices,
4159        headers_index,
4160        value_desc,
4161        partition_by,
4162        compression_type,
4163        progress_group_id,
4164        transactional_id,
4165        topic_options: KafkaTopicOptions {
4166            partition_count: topic_partition_count,
4167            replication_factor: topic_replication_factor,
4168            topic_config: topic_config.unwrap_or_default(),
4169        },
4170        topic_metadata_refresh_interval,
4171    }))
4172}
4173
4174pub fn describe_create_index(
4175    _: &StatementContext,
4176    _: CreateIndexStatement<Aug>,
4177) -> Result<StatementDesc, PlanError> {
4178    Ok(StatementDesc::new(None))
4179}
4180
4181pub fn plan_create_index(
4182    scx: &StatementContext,
4183    mut stmt: CreateIndexStatement<Aug>,
4184) -> Result<Plan, PlanError> {
4185    let CreateIndexStatement {
4186        name,
4187        on_name,
4188        in_cluster,
4189        key_parts,
4190        with_options,
4191        if_not_exists,
4192    } = &mut stmt;
4193    let on = scx.get_item_by_resolved_name(on_name)?;
4194
4195    {
4196        use CatalogItemType::*;
4197        match on.item_type() {
4198            Table | Source | View | MaterializedView | ContinualTask => {
4199                if on.replacement_target().is_some() {
4200                    sql_bail!(
4201                        "index cannot be created on {} because it is a replacement {}",
4202                        on_name.full_name_str(),
4203                        on.item_type(),
4204                    );
4205                }
4206            }
4207            Sink | Index | Type | Func | Secret | Connection => {
4208                sql_bail!(
4209                    "index cannot be created on {} because it is a {}",
4210                    on_name.full_name_str(),
4211                    on.item_type(),
4212                );
4213            }
4214        }
4215    }
4216
4217    let on_desc = on.relation_desc().expect("item type checked above");
4218
4219    let filled_key_parts = match key_parts {
4220        Some(kp) => kp.to_vec(),
4221        None => {
4222            // `key_parts` is None if we're creating a "default" index.
4223            let key = on_desc.typ().default_key();
4224            key.iter()
4225                .map(|i| match on_desc.get_unambiguous_name(*i) {
4226                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4227                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4228                })
4229                .collect()
4230        }
4231    };
4232    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4233
4234    let index_name = if let Some(name) = name {
4235        QualifiedItemName {
4236            qualifiers: on.name().qualifiers.clone(),
4237            item: normalize::ident(name.clone()),
4238        }
4239    } else {
4240        let mut idx_name = QualifiedItemName {
4241            qualifiers: on.name().qualifiers.clone(),
4242            item: on.name().item.clone(),
4243        };
4244        if key_parts.is_none() {
4245            // We're trying to create the "default" index.
4246            idx_name.item += "_primary_idx";
4247        } else {
4248            // Use PG schema for automatically naming indexes:
4249            // `<table>_<_-separated indexed expressions>_idx`
4250            let index_name_col_suffix = keys
4251                .iter()
4252                .map(|k| match k {
4253                    mz_expr::MirScalarExpr::Column(i, name) => {
4254                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4255                            (Some(col_name), _) => col_name.to_string(),
4256                            (None, Some(name)) => name.to_string(),
4257                            (None, None) => format!("{}", i + 1),
4258                        }
4259                    }
4260                    _ => "expr".to_string(),
4261                })
4262                .join("_");
4263            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4264                .expect("write on strings cannot fail");
4265            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4266        }
4267
4268        if !*if_not_exists {
4269            scx.catalog.find_available_name(idx_name)
4270        } else {
4271            idx_name
4272        }
4273    };
4274
4275    // Check for an object in the catalog with this same name
4276    let full_name = scx.catalog.resolve_full_name(&index_name);
4277    let partial_name = PartialItemName::from(full_name.clone());
4278    // For PostgreSQL compatibility, we need to prevent creating indexes when
4279    // there is an existing object *or* type of the same name.
4280    //
4281    // Technically, we only need to prevent coexistence of indexes and types
4282    // that have an associated relation (record types but not list/map types).
4283    // Enforcing that would be more complicated, though. It's backwards
4284    // compatible to weaken this restriction in the future.
4285    if let (Ok(item), false, false) = (
4286        scx.catalog.resolve_item_or_type(&partial_name),
4287        *if_not_exists,
4288        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4289    ) {
4290        return Err(PlanError::ItemAlreadyExists {
4291            name: full_name.to_string(),
4292            item_type: item.item_type(),
4293        });
4294    }
4295
4296    let options = plan_index_options(scx, with_options.clone())?;
4297    let cluster_id = match in_cluster {
4298        None => scx.resolve_cluster(None)?.id(),
4299        Some(in_cluster) => in_cluster.id,
4300    };
4301
4302    *in_cluster = Some(ResolvedClusterName {
4303        id: cluster_id,
4304        print_name: None,
4305    });
4306
4307    // Normalize `stmt`.
4308    *name = Some(Ident::new(index_name.item.clone())?);
4309    *key_parts = Some(filled_key_parts);
4310    let if_not_exists = *if_not_exists;
4311
4312    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4313    let compaction_window = options.iter().find_map(|o| {
4314        #[allow(irrefutable_let_patterns)]
4315        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4316            Some(lcw.clone())
4317        } else {
4318            None
4319        }
4320    });
4321
4322    Ok(Plan::CreateIndex(CreateIndexPlan {
4323        name: index_name,
4324        index: Index {
4325            create_sql,
4326            on: on.global_id(),
4327            keys,
4328            cluster_id,
4329            compaction_window,
4330        },
4331        if_not_exists,
4332    }))
4333}
4334
4335pub fn describe_create_type(
4336    _: &StatementContext,
4337    _: CreateTypeStatement<Aug>,
4338) -> Result<StatementDesc, PlanError> {
4339    Ok(StatementDesc::new(None))
4340}
4341
4342pub fn plan_create_type(
4343    scx: &StatementContext,
4344    stmt: CreateTypeStatement<Aug>,
4345) -> Result<Plan, PlanError> {
4346    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4347    let CreateTypeStatement { name, as_type, .. } = stmt;
4348
4349    fn validate_data_type(
4350        scx: &StatementContext,
4351        data_type: ResolvedDataType,
4352        as_type: &str,
4353        key: &str,
4354    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4355        let (id, modifiers) = match data_type {
4356            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4357            _ => sql_bail!(
4358                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4359                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4360                as_type,
4361                key,
4362                data_type.human_readable_name(),
4363            ),
4364        };
4365
4366        let item = scx.catalog.get_item(&id);
4367        match item.type_details() {
4368            None => sql_bail!(
4369                "{} must be of class type, but received {} which is of class {}",
4370                key,
4371                scx.catalog.resolve_full_name(item.name()),
4372                item.item_type()
4373            ),
4374            Some(CatalogTypeDetails {
4375                typ: CatalogType::Char,
4376                ..
4377            }) => {
4378                bail_unsupported!("embedding char type in a list or map")
4379            }
4380            _ => {
4381                // Validate that the modifiers are actually valid.
4382                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4383
4384                Ok((id, modifiers))
4385            }
4386        }
4387    }
4388
4389    let inner = match as_type {
4390        CreateTypeAs::List { options } => {
4391            let CreateTypeListOptionExtracted {
4392                element_type,
4393                seen: _,
4394            } = CreateTypeListOptionExtracted::try_from(options)?;
4395            let element_type =
4396                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4397            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4398            CatalogType::List {
4399                element_reference: id,
4400                element_modifiers: modifiers,
4401            }
4402        }
4403        CreateTypeAs::Map { options } => {
4404            let CreateTypeMapOptionExtracted {
4405                key_type,
4406                value_type,
4407                seen: _,
4408            } = CreateTypeMapOptionExtracted::try_from(options)?;
4409            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4410            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4411            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4412            let (value_id, value_modifiers) =
4413                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4414            CatalogType::Map {
4415                key_reference: key_id,
4416                key_modifiers,
4417                value_reference: value_id,
4418                value_modifiers,
4419            }
4420        }
4421        CreateTypeAs::Record { column_defs } => {
4422            let mut fields = vec![];
4423            for column_def in column_defs {
4424                let data_type = column_def.data_type;
4425                let key = ident(column_def.name.clone());
4426                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4427                fields.push(CatalogRecordField {
4428                    name: ColumnName::from(key.clone()),
4429                    type_reference: id,
4430                    type_modifiers: modifiers,
4431                });
4432            }
4433            CatalogType::Record { fields }
4434        }
4435    };
4436
4437    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4438
4439    // Check for an object in the catalog with this same name
4440    let full_name = scx.catalog.resolve_full_name(&name);
4441    let partial_name = PartialItemName::from(full_name.clone());
4442    // For PostgreSQL compatibility, we need to prevent creating types when
4443    // there is an existing object *or* type of the same name.
4444    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4445        if item.item_type().conflicts_with_type() {
4446            return Err(PlanError::ItemAlreadyExists {
4447                name: full_name.to_string(),
4448                item_type: item.item_type(),
4449            });
4450        }
4451    }
4452
4453    Ok(Plan::CreateType(CreateTypePlan {
4454        name,
4455        typ: Type { create_sql, inner },
4456    }))
4457}
4458
4459generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4460
4461generate_extracted_config!(
4462    CreateTypeMapOption,
4463    (KeyType, ResolvedDataType),
4464    (ValueType, ResolvedDataType)
4465);
4466
4467#[derive(Debug)]
4468pub enum PlannedAlterRoleOption {
4469    Attributes(PlannedRoleAttributes),
4470    Variable(PlannedRoleVariable),
4471}
4472
4473#[derive(Debug, Clone)]
4474pub struct PlannedRoleAttributes {
4475    pub inherit: Option<bool>,
4476    pub password: Option<Password>,
4477    pub scram_iterations: Option<NonZeroU32>,
4478    /// `nopassword` is set to true if the password is from the parser is None.
4479    /// This is semantically different than not supplying a password at all,
4480    /// to allow for unsetting a password.
4481    pub nopassword: Option<bool>,
4482    pub superuser: Option<bool>,
4483    pub login: Option<bool>,
4484}
4485
4486fn plan_role_attributes(
4487    options: Vec<RoleAttribute>,
4488    scx: &StatementContext,
4489) -> Result<PlannedRoleAttributes, PlanError> {
4490    let mut planned_attributes = PlannedRoleAttributes {
4491        inherit: None,
4492        password: None,
4493        scram_iterations: None,
4494        superuser: None,
4495        login: None,
4496        nopassword: None,
4497    };
4498
4499    for option in options {
4500        match option {
4501            RoleAttribute::Inherit | RoleAttribute::NoInherit
4502                if planned_attributes.inherit.is_some() =>
4503            {
4504                sql_bail!("conflicting or redundant options");
4505            }
4506            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4507                bail_never_supported!(
4508                    "CREATECLUSTER attribute",
4509                    "sql/create-role/#details",
4510                    "Use system privileges instead."
4511                );
4512            }
4513            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4514                bail_never_supported!(
4515                    "CREATEDB attribute",
4516                    "sql/create-role/#details",
4517                    "Use system privileges instead."
4518                );
4519            }
4520            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4521                bail_never_supported!(
4522                    "CREATEROLE attribute",
4523                    "sql/create-role/#details",
4524                    "Use system privileges instead."
4525                );
4526            }
4527            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4528                sql_bail!("conflicting or redundant options");
4529            }
4530
4531            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4532            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4533            RoleAttribute::Password(password) => {
4534                if let Some(password) = password {
4535                    planned_attributes.password = Some(password.into());
4536                    planned_attributes.scram_iterations =
4537                        Some(scx.catalog.system_vars().scram_iterations())
4538                } else {
4539                    planned_attributes.nopassword = Some(true);
4540                }
4541            }
4542            RoleAttribute::SuperUser => {
4543                if planned_attributes.superuser == Some(false) {
4544                    sql_bail!("conflicting or redundant options");
4545                }
4546                planned_attributes.superuser = Some(true);
4547            }
4548            RoleAttribute::NoSuperUser => {
4549                if planned_attributes.superuser == Some(true) {
4550                    sql_bail!("conflicting or redundant options");
4551                }
4552                planned_attributes.superuser = Some(false);
4553            }
4554            RoleAttribute::Login => {
4555                if planned_attributes.login == Some(false) {
4556                    sql_bail!("conflicting or redundant options");
4557                }
4558                planned_attributes.login = Some(true);
4559            }
4560            RoleAttribute::NoLogin => {
4561                if planned_attributes.login == Some(true) {
4562                    sql_bail!("conflicting or redundant options");
4563                }
4564                planned_attributes.login = Some(false);
4565            }
4566        }
4567    }
4568    if planned_attributes.inherit == Some(false) {
4569        bail_unsupported!("non inherit roles");
4570    }
4571
4572    Ok(planned_attributes)
4573}
4574
4575#[derive(Debug)]
4576pub enum PlannedRoleVariable {
4577    Set { name: String, value: VariableValue },
4578    Reset { name: String },
4579}
4580
4581impl PlannedRoleVariable {
4582    pub fn name(&self) -> &str {
4583        match self {
4584            PlannedRoleVariable::Set { name, .. } => name,
4585            PlannedRoleVariable::Reset { name } => name,
4586        }
4587    }
4588}
4589
4590fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4591    let plan = match variable {
4592        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4593            name: name.to_string(),
4594            value: scl::plan_set_variable_to(value)?,
4595        },
4596        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4597            name: name.to_string(),
4598        },
4599    };
4600    Ok(plan)
4601}
4602
4603pub fn describe_create_role(
4604    _: &StatementContext,
4605    _: CreateRoleStatement,
4606) -> Result<StatementDesc, PlanError> {
4607    Ok(StatementDesc::new(None))
4608}
4609
4610pub fn plan_create_role(
4611    scx: &StatementContext,
4612    CreateRoleStatement { name, options }: CreateRoleStatement,
4613) -> Result<Plan, PlanError> {
4614    let attributes = plan_role_attributes(options, scx)?;
4615    Ok(Plan::CreateRole(CreateRolePlan {
4616        name: normalize::ident(name),
4617        attributes: attributes.into(),
4618    }))
4619}
4620
4621pub fn plan_create_network_policy(
4622    ctx: &StatementContext,
4623    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4624) -> Result<Plan, PlanError> {
4625    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4626    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4627
4628    let Some(rule_defs) = policy_options.rules else {
4629        sql_bail!("RULES must be specified when creating network policies.");
4630    };
4631
4632    let mut rules = vec![];
4633    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4634        let NetworkPolicyRuleOptionExtracted {
4635            seen: _,
4636            direction,
4637            action,
4638            address,
4639        } = options.try_into()?;
4640        let (direction, action, address) = match (direction, action, address) {
4641            (Some(direction), Some(action), Some(address)) => (
4642                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4643                NetworkPolicyRuleAction::try_from(action.as_str())?,
4644                PolicyAddress::try_from(address.as_str())?,
4645            ),
4646            (_, _, _) => {
4647                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4648            }
4649        };
4650        rules.push(NetworkPolicyRule {
4651            name: normalize::ident(name),
4652            direction,
4653            action,
4654            address,
4655        });
4656    }
4657
4658    if rules.len()
4659        > ctx
4660            .catalog
4661            .system_vars()
4662            .max_rules_per_network_policy()
4663            .try_into()?
4664    {
4665        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4666    }
4667
4668    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4669        name: normalize::ident(name),
4670        rules,
4671    }))
4672}
4673
4674pub fn plan_alter_network_policy(
4675    ctx: &StatementContext,
4676    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4677) -> Result<Plan, PlanError> {
4678    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4679
4680    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4681    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4682
4683    let Some(rule_defs) = policy_options.rules else {
4684        sql_bail!("RULES must be specified when creating network policies.");
4685    };
4686
4687    let mut rules = vec![];
4688    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4689        let NetworkPolicyRuleOptionExtracted {
4690            seen: _,
4691            direction,
4692            action,
4693            address,
4694        } = options.try_into()?;
4695
4696        let (direction, action, address) = match (direction, action, address) {
4697            (Some(direction), Some(action), Some(address)) => (
4698                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4699                NetworkPolicyRuleAction::try_from(action.as_str())?,
4700                PolicyAddress::try_from(address.as_str())?,
4701            ),
4702            (_, _, _) => {
4703                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4704            }
4705        };
4706        rules.push(NetworkPolicyRule {
4707            name: normalize::ident(name),
4708            direction,
4709            action,
4710            address,
4711        });
4712    }
4713    if rules.len()
4714        > ctx
4715            .catalog
4716            .system_vars()
4717            .max_rules_per_network_policy()
4718            .try_into()?
4719    {
4720        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4721    }
4722
4723    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4724        id: policy.id(),
4725        name: normalize::ident(name),
4726        rules,
4727    }))
4728}
4729
4730pub fn describe_create_cluster(
4731    _: &StatementContext,
4732    _: CreateClusterStatement<Aug>,
4733) -> Result<StatementDesc, PlanError> {
4734    Ok(StatementDesc::new(None))
4735}
4736
4737// WARNING:
4738// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4739// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4740// that option stays the same. If you were to give a default value here, then not giving that option
4741// to ALTER CLUSTER would always reset the value of that option to the default.
4742generate_extracted_config!(
4743    ClusterOption,
4744    (AvailabilityZones, Vec<String>),
4745    (Disk, bool),
4746    (IntrospectionDebugging, bool),
4747    (IntrospectionInterval, OptionalDuration),
4748    (Managed, bool),
4749    (Replicas, Vec<ReplicaDefinition<Aug>>),
4750    (ReplicationFactor, u32),
4751    (Size, String),
4752    (Schedule, ClusterScheduleOptionValue),
4753    (WorkloadClass, OptionalString)
4754);
4755
4756generate_extracted_config!(
4757    NetworkPolicyOption,
4758    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4759);
4760
4761generate_extracted_config!(
4762    NetworkPolicyRuleOption,
4763    (Direction, String),
4764    (Action, String),
4765    (Address, String)
4766);
4767
4768generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4769
4770generate_extracted_config!(
4771    ClusterAlterUntilReadyOption,
4772    (Timeout, Duration),
4773    (OnTimeout, String)
4774);
4775
4776generate_extracted_config!(
4777    ClusterFeature,
4778    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4779    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4780    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4781    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4782    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4783    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4784    (
4785        EnableProjectionPushdownAfterRelationCse,
4786        Option<bool>,
4787        Default(None)
4788    )
4789);
4790
4791/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4792///
4793/// The reverse of [`unplan_create_cluster`].
4794pub fn plan_create_cluster(
4795    scx: &StatementContext,
4796    stmt: CreateClusterStatement<Aug>,
4797) -> Result<Plan, PlanError> {
4798    let plan = plan_create_cluster_inner(scx, stmt)?;
4799
4800    // Roundtrip through unplan and make sure that we end up with the same plan.
4801    if let CreateClusterVariant::Managed(_) = &plan.variant {
4802        let stmt = unplan_create_cluster(scx, plan.clone())
4803            .map_err(|e| PlanError::Replan(e.to_string()))?;
4804        let create_sql = stmt.to_ast_string_stable();
4805        let stmt = parse::parse(&create_sql)
4806            .map_err(|e| PlanError::Replan(e.to_string()))?
4807            .into_element()
4808            .ast;
4809        let (stmt, _resolved_ids) =
4810            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4811        let stmt = match stmt {
4812            Statement::CreateCluster(stmt) => stmt,
4813            stmt => {
4814                return Err(PlanError::Replan(format!(
4815                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4816                )));
4817            }
4818        };
4819        let replan =
4820            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4821        if plan != replan {
4822            return Err(PlanError::Replan(format!(
4823                "replan does not match: plan={plan:?}, replan={replan:?}"
4824            )));
4825        }
4826    }
4827
4828    Ok(Plan::CreateCluster(plan))
4829}
4830
4831pub fn plan_create_cluster_inner(
4832    scx: &StatementContext,
4833    CreateClusterStatement {
4834        name,
4835        options,
4836        features,
4837    }: CreateClusterStatement<Aug>,
4838) -> Result<CreateClusterPlan, PlanError> {
4839    let ClusterOptionExtracted {
4840        availability_zones,
4841        introspection_debugging,
4842        introspection_interval,
4843        managed,
4844        replicas,
4845        replication_factor,
4846        seen: _,
4847        size,
4848        disk,
4849        schedule,
4850        workload_class,
4851    }: ClusterOptionExtracted = options.try_into()?;
4852
4853    let managed = managed.unwrap_or_else(|| replicas.is_none());
4854
4855    if !scx.catalog.active_role_id().is_system() {
4856        if !features.is_empty() {
4857            sql_bail!("FEATURES not supported for non-system users");
4858        }
4859        if workload_class.is_some() {
4860            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4861        }
4862    }
4863
4864    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4865    let workload_class = workload_class.and_then(|v| v.0);
4866
4867    if managed {
4868        if replicas.is_some() {
4869            sql_bail!("REPLICAS not supported for managed clusters");
4870        }
4871        let Some(size) = size else {
4872            sql_bail!("SIZE must be specified for managed clusters");
4873        };
4874
4875        if disk.is_some() {
4876            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4877            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4878            // we'll be able to remove the `DISK` option entirely.
4879            if scx.catalog.is_cluster_size_cc(&size) {
4880                sql_bail!(
4881                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4882                );
4883            }
4884
4885            scx.catalog
4886                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4887        }
4888
4889        let compute = plan_compute_replica_config(
4890            introspection_interval,
4891            introspection_debugging.unwrap_or(false),
4892        )?;
4893
4894        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4895            replication_factor.unwrap_or_else(|| {
4896                scx.catalog
4897                    .system_vars()
4898                    .default_cluster_replication_factor()
4899            })
4900        } else {
4901            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4902            if replication_factor.is_some() {
4903                sql_bail!(
4904                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4905                );
4906            }
4907            // If we have a non-trivial schedule, then let's not have any replicas initially,
4908            // to avoid quickly going back and forth if the schedule doesn't want a replica
4909            // initially.
4910            0
4911        };
4912        let availability_zones = availability_zones.unwrap_or_default();
4913
4914        if !availability_zones.is_empty() {
4915            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4916        }
4917
4918        // Plan OptimizerFeatureOverrides.
4919        let ClusterFeatureExtracted {
4920            reoptimize_imported_views,
4921            enable_eager_delta_joins,
4922            enable_new_outer_join_lowering,
4923            enable_variadic_left_join_lowering,
4924            enable_letrec_fixpoint_analysis,
4925            enable_join_prioritize_arranged,
4926            enable_projection_pushdown_after_relation_cse,
4927            seen: _,
4928        } = ClusterFeatureExtracted::try_from(features)?;
4929        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4930            reoptimize_imported_views,
4931            enable_eager_delta_joins,
4932            enable_new_outer_join_lowering,
4933            enable_variadic_left_join_lowering,
4934            enable_letrec_fixpoint_analysis,
4935            enable_join_prioritize_arranged,
4936            enable_projection_pushdown_after_relation_cse,
4937            ..Default::default()
4938        };
4939
4940        let schedule = plan_cluster_schedule(schedule)?;
4941
4942        Ok(CreateClusterPlan {
4943            name: normalize::ident(name),
4944            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4945                replication_factor,
4946                size,
4947                availability_zones,
4948                compute,
4949                optimizer_feature_overrides,
4950                schedule,
4951            }),
4952            workload_class,
4953        })
4954    } else {
4955        let Some(replica_defs) = replicas else {
4956            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4957        };
4958        if availability_zones.is_some() {
4959            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4960        }
4961        if replication_factor.is_some() {
4962            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4963        }
4964        if introspection_debugging.is_some() {
4965            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4966        }
4967        if introspection_interval.is_some() {
4968            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4969        }
4970        if size.is_some() {
4971            sql_bail!("SIZE not supported for unmanaged clusters");
4972        }
4973        if disk.is_some() {
4974            sql_bail!("DISK not supported for unmanaged clusters");
4975        }
4976        if !features.is_empty() {
4977            sql_bail!("FEATURES not supported for unmanaged clusters");
4978        }
4979        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4980            sql_bail!(
4981                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4982            );
4983        }
4984
4985        let mut replicas = vec![];
4986        for ReplicaDefinition { name, options } in replica_defs {
4987            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4988        }
4989
4990        Ok(CreateClusterPlan {
4991            name: normalize::ident(name),
4992            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4993            workload_class,
4994        })
4995    }
4996}
4997
4998/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4999///
5000/// The reverse of [`plan_create_cluster`].
5001pub fn unplan_create_cluster(
5002    scx: &StatementContext,
5003    CreateClusterPlan {
5004        name,
5005        variant,
5006        workload_class,
5007    }: CreateClusterPlan,
5008) -> Result<CreateClusterStatement<Aug>, PlanError> {
5009    match variant {
5010        CreateClusterVariant::Managed(CreateClusterManagedPlan {
5011            replication_factor,
5012            size,
5013            availability_zones,
5014            compute,
5015            optimizer_feature_overrides,
5016            schedule,
5017        }) => {
5018            let schedule = unplan_cluster_schedule(schedule);
5019            let OptimizerFeatureOverrides {
5020                enable_guard_subquery_tablefunc: _,
5021                enable_consolidate_after_union_negate: _,
5022                enable_reduce_mfp_fusion: _,
5023                enable_cardinality_estimates: _,
5024                persist_fast_path_limit: _,
5025                reoptimize_imported_views,
5026                enable_eager_delta_joins,
5027                enable_new_outer_join_lowering,
5028                enable_variadic_left_join_lowering,
5029                enable_letrec_fixpoint_analysis,
5030                enable_reduce_reduction: _,
5031                enable_join_prioritize_arranged,
5032                enable_projection_pushdown_after_relation_cse,
5033                enable_less_reduce_in_eqprop: _,
5034                enable_dequadratic_eqprop_map: _,
5035                enable_eq_classes_withholding_errors: _,
5036                enable_fast_path_plan_insights: _,
5037                enable_cast_elimination: _,
5038            } = optimizer_feature_overrides;
5039            // The ones from above that don't occur below are not wired up to cluster features.
5040            let features_extracted = ClusterFeatureExtracted {
5041                // Seen is ignored when unplanning.
5042                seen: Default::default(),
5043                reoptimize_imported_views,
5044                enable_eager_delta_joins,
5045                enable_new_outer_join_lowering,
5046                enable_variadic_left_join_lowering,
5047                enable_letrec_fixpoint_analysis,
5048                enable_join_prioritize_arranged,
5049                enable_projection_pushdown_after_relation_cse,
5050            };
5051            let features = features_extracted.into_values(scx.catalog);
5052            let availability_zones = if availability_zones.is_empty() {
5053                None
5054            } else {
5055                Some(availability_zones)
5056            };
5057            let (introspection_interval, introspection_debugging) =
5058                unplan_compute_replica_config(compute);
5059            // Replication factor cannot be explicitly specified with a refresh schedule, it's
5060            // always 1 or less.
5061            let replication_factor = match &schedule {
5062                ClusterScheduleOptionValue::Manual => Some(replication_factor),
5063                ClusterScheduleOptionValue::Refresh { .. } => {
5064                    assert!(
5065                        replication_factor <= 1,
5066                        "replication factor, {replication_factor:?}, must be <= 1"
5067                    );
5068                    None
5069                }
5070            };
5071            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5072            let options_extracted = ClusterOptionExtracted {
5073                // Seen is ignored when unplanning.
5074                seen: Default::default(),
5075                availability_zones,
5076                disk: None,
5077                introspection_debugging: Some(introspection_debugging),
5078                introspection_interval,
5079                managed: Some(true),
5080                replicas: None,
5081                replication_factor,
5082                size: Some(size),
5083                schedule: Some(schedule),
5084                workload_class,
5085            };
5086            let options = options_extracted.into_values(scx.catalog);
5087            let name = Ident::new_unchecked(name);
5088            Ok(CreateClusterStatement {
5089                name,
5090                options,
5091                features,
5092            })
5093        }
5094        CreateClusterVariant::Unmanaged(_) => {
5095            bail_unsupported!("SHOW CREATE for unmanaged clusters")
5096        }
5097    }
5098}
5099
5100generate_extracted_config!(
5101    ReplicaOption,
5102    (AvailabilityZone, String),
5103    (BilledAs, String),
5104    (ComputeAddresses, Vec<String>),
5105    (ComputectlAddresses, Vec<String>),
5106    (Disk, bool),
5107    (Internal, bool, Default(false)),
5108    (IntrospectionDebugging, bool, Default(false)),
5109    (IntrospectionInterval, OptionalDuration),
5110    (Size, String),
5111    (StorageAddresses, Vec<String>),
5112    (StoragectlAddresses, Vec<String>),
5113    (Workers, u16)
5114);
5115
5116fn plan_replica_config(
5117    scx: &StatementContext,
5118    options: Vec<ReplicaOption<Aug>>,
5119) -> Result<ReplicaConfig, PlanError> {
5120    let ReplicaOptionExtracted {
5121        availability_zone,
5122        billed_as,
5123        computectl_addresses,
5124        disk,
5125        internal,
5126        introspection_debugging,
5127        introspection_interval,
5128        size,
5129        storagectl_addresses,
5130        ..
5131    }: ReplicaOptionExtracted = options.try_into()?;
5132
5133    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5134
5135    match (
5136        size,
5137        availability_zone,
5138        billed_as,
5139        storagectl_addresses,
5140        computectl_addresses,
5141    ) {
5142        // Common cases we expect end users to hit.
5143        (None, _, None, None, None) => {
5144            // We don't mention the unmanaged options in the error message
5145            // because they are only available in unsafe mode.
5146            sql_bail!("SIZE option must be specified");
5147        }
5148        (Some(size), availability_zone, billed_as, None, None) => {
5149            if disk.is_some() {
5150                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5151                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5152                // we'll be able to remove the `DISK` option entirely.
5153                if scx.catalog.is_cluster_size_cc(&size) {
5154                    sql_bail!(
5155                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5156                    );
5157                }
5158
5159                scx.catalog
5160                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5161            }
5162
5163            Ok(ReplicaConfig::Orchestrated {
5164                size,
5165                availability_zone,
5166                compute,
5167                billed_as,
5168                internal,
5169            })
5170        }
5171
5172        (None, None, None, storagectl_addresses, computectl_addresses) => {
5173            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5174
5175            // When manually testing Materialize in unsafe mode, it's easy to
5176            // accidentally omit one of these options, so we try to produce
5177            // helpful error messages.
5178            let Some(storagectl_addrs) = storagectl_addresses else {
5179                sql_bail!("missing STORAGECTL ADDRESSES option");
5180            };
5181            let Some(computectl_addrs) = computectl_addresses else {
5182                sql_bail!("missing COMPUTECTL ADDRESSES option");
5183            };
5184
5185            if storagectl_addrs.len() != computectl_addrs.len() {
5186                sql_bail!(
5187                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5188                );
5189            }
5190
5191            if disk.is_some() {
5192                sql_bail!("DISK can't be specified for unorchestrated clusters");
5193            }
5194
5195            Ok(ReplicaConfig::Unorchestrated {
5196                storagectl_addrs,
5197                computectl_addrs,
5198                compute,
5199            })
5200        }
5201        _ => {
5202            // We don't bother trying to produce a more helpful error message
5203            // here because no user is likely to hit this path.
5204            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5205        }
5206    }
5207}
5208
5209/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5210///
5211/// The reverse of [`unplan_compute_replica_config`].
5212fn plan_compute_replica_config(
5213    introspection_interval: Option<OptionalDuration>,
5214    introspection_debugging: bool,
5215) -> Result<ComputeReplicaConfig, PlanError> {
5216    let introspection_interval = introspection_interval
5217        .map(|OptionalDuration(i)| i)
5218        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5219    let introspection = match introspection_interval {
5220        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5221            interval,
5222            debugging: introspection_debugging,
5223        }),
5224        None if introspection_debugging => {
5225            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5226        }
5227        None => None,
5228    };
5229    let compute = ComputeReplicaConfig { introspection };
5230    Ok(compute)
5231}
5232
5233/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5234///
5235/// The reverse of [`plan_compute_replica_config`].
5236fn unplan_compute_replica_config(
5237    compute_replica_config: ComputeReplicaConfig,
5238) -> (Option<OptionalDuration>, bool) {
5239    match compute_replica_config.introspection {
5240        Some(ComputeReplicaIntrospectionConfig {
5241            debugging,
5242            interval,
5243        }) => (Some(OptionalDuration(Some(interval))), debugging),
5244        None => (Some(OptionalDuration(None)), false),
5245    }
5246}
5247
5248/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5249///
5250/// The reverse of [`unplan_cluster_schedule`].
5251fn plan_cluster_schedule(
5252    schedule: ClusterScheduleOptionValue,
5253) -> Result<ClusterSchedule, PlanError> {
5254    Ok(match schedule {
5255        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5256        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5257        ClusterScheduleOptionValue::Refresh {
5258            hydration_time_estimate: None,
5259        } => ClusterSchedule::Refresh {
5260            hydration_time_estimate: Duration::from_millis(0),
5261        },
5262        // Otherwise we convert the `IntervalValue` to a `Duration`.
5263        ClusterScheduleOptionValue::Refresh {
5264            hydration_time_estimate: Some(interval_value),
5265        } => {
5266            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5267            if interval.as_microseconds() < 0 {
5268                sql_bail!(
5269                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5270                    interval
5271                );
5272            }
5273            if interval.months != 0 {
5274                // This limitation is because we want this interval to be cleanly convertable
5275                // to a unix epoch timestamp difference. When the interval involves months, then
5276                // this is not true anymore, because months have variable lengths.
5277                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5278            }
5279            let duration = interval.duration()?;
5280            if u64::try_from(duration.as_millis()).is_err()
5281                || Interval::from_duration(&duration).is_err()
5282            {
5283                sql_bail!("HYDRATION TIME ESTIMATE too large");
5284            }
5285            ClusterSchedule::Refresh {
5286                hydration_time_estimate: duration,
5287            }
5288        }
5289    })
5290}
5291
5292/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5293///
5294/// The reverse of [`plan_cluster_schedule`].
5295fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5296    match schedule {
5297        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5298        ClusterSchedule::Refresh {
5299            hydration_time_estimate,
5300        } => {
5301            let interval = Interval::from_duration(&hydration_time_estimate)
5302                .expect("planning ensured that this is convertible back to Interval");
5303            let interval_value = literal::unplan_interval(&interval);
5304            ClusterScheduleOptionValue::Refresh {
5305                hydration_time_estimate: Some(interval_value),
5306            }
5307        }
5308    }
5309}
5310
5311pub fn describe_create_cluster_replica(
5312    _: &StatementContext,
5313    _: CreateClusterReplicaStatement<Aug>,
5314) -> Result<StatementDesc, PlanError> {
5315    Ok(StatementDesc::new(None))
5316}
5317
5318pub fn plan_create_cluster_replica(
5319    scx: &StatementContext,
5320    CreateClusterReplicaStatement {
5321        definition: ReplicaDefinition { name, options },
5322        of_cluster,
5323    }: CreateClusterReplicaStatement<Aug>,
5324) -> Result<Plan, PlanError> {
5325    let cluster = scx
5326        .catalog
5327        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5328    let current_replica_count = cluster.replica_ids().iter().count();
5329    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5330        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5331        return Err(PlanError::CreateReplicaFailStorageObjects {
5332            current_replica_count,
5333            internal_replica_count,
5334            hypothetical_replica_count: current_replica_count + 1,
5335        });
5336    }
5337
5338    let config = plan_replica_config(scx, options)?;
5339
5340    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5341        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5342            return Err(PlanError::MangedReplicaName(name.into_string()));
5343        }
5344    } else {
5345        ensure_cluster_is_not_managed(scx, cluster.id())?;
5346    }
5347
5348    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5349        name: normalize::ident(name),
5350        cluster_id: cluster.id(),
5351        config,
5352    }))
5353}
5354
5355pub fn describe_create_secret(
5356    _: &StatementContext,
5357    _: CreateSecretStatement<Aug>,
5358) -> Result<StatementDesc, PlanError> {
5359    Ok(StatementDesc::new(None))
5360}
5361
5362pub fn plan_create_secret(
5363    scx: &StatementContext,
5364    stmt: CreateSecretStatement<Aug>,
5365) -> Result<Plan, PlanError> {
5366    let CreateSecretStatement {
5367        name,
5368        if_not_exists,
5369        value,
5370    } = &stmt;
5371
5372    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5373    let mut create_sql_statement = stmt.clone();
5374    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5375    let create_sql =
5376        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5377    let secret_as = query::plan_secret_as(scx, value.clone())?;
5378
5379    let secret = Secret {
5380        create_sql,
5381        secret_as,
5382    };
5383
5384    Ok(Plan::CreateSecret(CreateSecretPlan {
5385        name,
5386        secret,
5387        if_not_exists: *if_not_exists,
5388    }))
5389}
5390
5391pub fn describe_create_connection(
5392    _: &StatementContext,
5393    _: CreateConnectionStatement<Aug>,
5394) -> Result<StatementDesc, PlanError> {
5395    Ok(StatementDesc::new(None))
5396}
5397
5398generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5399
5400pub fn plan_create_connection(
5401    scx: &StatementContext,
5402    mut stmt: CreateConnectionStatement<Aug>,
5403) -> Result<Plan, PlanError> {
5404    let CreateConnectionStatement {
5405        name,
5406        connection_type,
5407        values,
5408        if_not_exists,
5409        with_options,
5410    } = stmt.clone();
5411    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5412    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5413    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5414
5415    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5416    if options.validate.is_some() {
5417        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5418    }
5419    let validate = match options.validate {
5420        Some(val) => val,
5421        None => {
5422            scx.catalog
5423                .system_vars()
5424                .enable_default_connection_validation()
5425                && details.to_connection().validate_by_default()
5426        }
5427    };
5428
5429    // Check for an object in the catalog with this same name
5430    let full_name = scx.catalog.resolve_full_name(&name);
5431    let partial_name = PartialItemName::from(full_name.clone());
5432    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5433        return Err(PlanError::ItemAlreadyExists {
5434            name: full_name.to_string(),
5435            item_type: item.item_type(),
5436        });
5437    }
5438
5439    // For SSH connections, overwrite the public key options based on the
5440    // connection details, in case we generated new keys during planning.
5441    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5442        stmt.values.retain(|v| {
5443            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5444        });
5445        stmt.values.push(ConnectionOption {
5446            name: ConnectionOptionName::PublicKey1,
5447            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5448        });
5449        stmt.values.push(ConnectionOption {
5450            name: ConnectionOptionName::PublicKey2,
5451            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5452        });
5453    }
5454    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5455
5456    let plan = CreateConnectionPlan {
5457        name,
5458        if_not_exists,
5459        connection: crate::plan::Connection {
5460            create_sql,
5461            details,
5462        },
5463        validate,
5464    };
5465    Ok(Plan::CreateConnection(plan))
5466}
5467
5468fn plan_drop_database(
5469    scx: &StatementContext,
5470    if_exists: bool,
5471    name: &UnresolvedDatabaseName,
5472    cascade: bool,
5473) -> Result<Option<DatabaseId>, PlanError> {
5474    Ok(match resolve_database(scx, name, if_exists)? {
5475        Some(database) => {
5476            if !cascade && database.has_schemas() {
5477                sql_bail!(
5478                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5479                    name,
5480                );
5481            }
5482            Some(database.id())
5483        }
5484        None => None,
5485    })
5486}
5487
5488pub fn describe_drop_objects(
5489    _: &StatementContext,
5490    _: DropObjectsStatement,
5491) -> Result<StatementDesc, PlanError> {
5492    Ok(StatementDesc::new(None))
5493}
5494
5495pub fn plan_drop_objects(
5496    scx: &mut StatementContext,
5497    DropObjectsStatement {
5498        object_type,
5499        if_exists,
5500        names,
5501        cascade,
5502    }: DropObjectsStatement,
5503) -> Result<Plan, PlanError> {
5504    assert_ne!(
5505        object_type,
5506        mz_sql_parser::ast::ObjectType::Func,
5507        "rejected in parser"
5508    );
5509    let object_type = object_type.into();
5510
5511    let mut referenced_ids = Vec::new();
5512    for name in names {
5513        let id = match &name {
5514            UnresolvedObjectName::Cluster(name) => {
5515                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5516            }
5517            UnresolvedObjectName::ClusterReplica(name) => {
5518                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5519            }
5520            UnresolvedObjectName::Database(name) => {
5521                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5522            }
5523            UnresolvedObjectName::Schema(name) => {
5524                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5525            }
5526            UnresolvedObjectName::Role(name) => {
5527                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5528            }
5529            UnresolvedObjectName::Item(name) => {
5530                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5531                    .map(ObjectId::Item)
5532            }
5533            UnresolvedObjectName::NetworkPolicy(name) => {
5534                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5535            }
5536        };
5537        match id {
5538            Some(id) => referenced_ids.push(id),
5539            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5540                name: name.to_ast_string_simple(),
5541                object_type,
5542            }),
5543        }
5544    }
5545    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5546
5547    Ok(Plan::DropObjects(DropObjectsPlan {
5548        referenced_ids,
5549        drop_ids,
5550        object_type,
5551    }))
5552}
5553
5554fn plan_drop_schema(
5555    scx: &StatementContext,
5556    if_exists: bool,
5557    name: &UnresolvedSchemaName,
5558    cascade: bool,
5559) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5560    // Special case for mz_temp: with lazy temporary schema creation, the temp
5561    // schema may not exist yet, but we still need to return the correct error.
5562    // Check the schema name directly against MZ_TEMP_SCHEMA.
5563    let normalized = normalize::unresolved_schema_name(name.clone())?;
5564    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5565        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5566    }
5567
5568    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5569        Some((database_spec, schema_spec)) => {
5570            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5571                sql_bail!(
5572                    "cannot drop schema {name} because it is required by the database system",
5573                );
5574            }
5575            if let SchemaSpecifier::Temporary = schema_spec {
5576                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5577            }
5578            let schema = scx.get_schema(&database_spec, &schema_spec);
5579            if !cascade && schema.has_items() {
5580                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5581                sql_bail!(
5582                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5583                    full_schema_name
5584                );
5585            }
5586            Some((database_spec, schema_spec))
5587        }
5588        None => None,
5589    })
5590}
5591
5592fn plan_drop_role(
5593    scx: &StatementContext,
5594    if_exists: bool,
5595    name: &Ident,
5596) -> Result<Option<RoleId>, PlanError> {
5597    match scx.catalog.resolve_role(name.as_str()) {
5598        Ok(role) => {
5599            let id = role.id();
5600            if &id == scx.catalog.active_role_id() {
5601                sql_bail!("current role cannot be dropped");
5602            }
5603            for role in scx.catalog.get_roles() {
5604                for (member_id, grantor_id) in role.membership() {
5605                    if &id == grantor_id {
5606                        let member_role = scx.catalog.get_role(member_id);
5607                        sql_bail!(
5608                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5609                            name.as_str(),
5610                            role.name(),
5611                            member_role.name()
5612                        );
5613                    }
5614                }
5615            }
5616            Ok(Some(role.id()))
5617        }
5618        Err(_) if if_exists => Ok(None),
5619        Err(e) => Err(e.into()),
5620    }
5621}
5622
5623fn plan_drop_cluster(
5624    scx: &StatementContext,
5625    if_exists: bool,
5626    name: &Ident,
5627    cascade: bool,
5628) -> Result<Option<ClusterId>, PlanError> {
5629    Ok(match resolve_cluster(scx, name, if_exists)? {
5630        Some(cluster) => {
5631            if !cascade && !cluster.bound_objects().is_empty() {
5632                return Err(PlanError::DependentObjectsStillExist {
5633                    object_type: "cluster".to_string(),
5634                    object_name: cluster.name().to_string(),
5635                    dependents: Vec::new(),
5636                });
5637            }
5638            Some(cluster.id())
5639        }
5640        None => None,
5641    })
5642}
5643
5644fn plan_drop_network_policy(
5645    scx: &StatementContext,
5646    if_exists: bool,
5647    name: &Ident,
5648) -> Result<Option<NetworkPolicyId>, PlanError> {
5649    match scx.catalog.resolve_network_policy(name.as_str()) {
5650        Ok(policy) => {
5651            // TODO(network_policy): When we support role based network policies, check if any role
5652            // currently has the specified policy set.
5653            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5654                Err(PlanError::NetworkPolicyInUse)
5655            } else {
5656                Ok(Some(policy.id()))
5657            }
5658        }
5659        Err(_) if if_exists => Ok(None),
5660        Err(e) => Err(e.into()),
5661    }
5662}
5663
5664/// Returns `true` if the cluster has any object that requires a single replica.
5665/// Returns `false` if the cluster has no objects.
5666fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5667    // If this feature is enabled then all objects support multiple-replicas
5668    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5669        false
5670    } else {
5671        // Othewise we check for the existence of sources or sinks
5672        cluster.bound_objects().iter().any(|id| {
5673            let item = scx.catalog.get_item(id);
5674            matches!(
5675                item.item_type(),
5676                CatalogItemType::Sink | CatalogItemType::Source
5677            )
5678        })
5679    }
5680}
5681
5682fn plan_drop_cluster_replica(
5683    scx: &StatementContext,
5684    if_exists: bool,
5685    name: &QualifiedReplica,
5686) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5687    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5688    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5689}
5690
5691/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5692fn plan_drop_item(
5693    scx: &StatementContext,
5694    object_type: ObjectType,
5695    if_exists: bool,
5696    name: UnresolvedItemName,
5697    cascade: bool,
5698) -> Result<Option<CatalogItemId>, PlanError> {
5699    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5700        Ok(r) => r,
5701        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5702        Err(PlanError::MismatchedObjectType {
5703            name,
5704            is_type: ObjectType::MaterializedView,
5705            expected_type: ObjectType::View,
5706        }) => {
5707            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5708        }
5709        e => e?,
5710    };
5711
5712    Ok(match resolved {
5713        Some(catalog_item) => {
5714            if catalog_item.id().is_system() {
5715                sql_bail!(
5716                    "cannot drop {} {} because it is required by the database system",
5717                    catalog_item.item_type(),
5718                    scx.catalog.minimal_qualification(catalog_item.name()),
5719                );
5720            }
5721
5722            if !cascade {
5723                for id in catalog_item.used_by() {
5724                    let dep = scx.catalog.get_item(id);
5725                    if dependency_prevents_drop(object_type, dep) {
5726                        return Err(PlanError::DependentObjectsStillExist {
5727                            object_type: catalog_item.item_type().to_string(),
5728                            object_name: scx
5729                                .catalog
5730                                .minimal_qualification(catalog_item.name())
5731                                .to_string(),
5732                            dependents: vec![(
5733                                dep.item_type().to_string(),
5734                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5735                            )],
5736                        });
5737                    }
5738                }
5739                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5740                //  relies on entry. Unfortunately, we don't have that information readily available.
5741            }
5742            Some(catalog_item.id())
5743        }
5744        None => None,
5745    })
5746}
5747
5748/// Does the dependency `dep` prevent a drop of a non-cascade query?
5749fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5750    match object_type {
5751        ObjectType::Type => true,
5752        _ => match dep.item_type() {
5753            CatalogItemType::Func
5754            | CatalogItemType::Table
5755            | CatalogItemType::Source
5756            | CatalogItemType::View
5757            | CatalogItemType::MaterializedView
5758            | CatalogItemType::Sink
5759            | CatalogItemType::Type
5760            | CatalogItemType::Secret
5761            | CatalogItemType::Connection
5762            | CatalogItemType::ContinualTask => true,
5763            CatalogItemType::Index => false,
5764        },
5765    }
5766}
5767
5768pub fn describe_alter_index_options(
5769    _: &StatementContext,
5770    _: AlterIndexStatement<Aug>,
5771) -> Result<StatementDesc, PlanError> {
5772    Ok(StatementDesc::new(None))
5773}
5774
5775pub fn describe_drop_owned(
5776    _: &StatementContext,
5777    _: DropOwnedStatement<Aug>,
5778) -> Result<StatementDesc, PlanError> {
5779    Ok(StatementDesc::new(None))
5780}
5781
5782pub fn plan_drop_owned(
5783    scx: &StatementContext,
5784    drop: DropOwnedStatement<Aug>,
5785) -> Result<Plan, PlanError> {
5786    let cascade = drop.cascade();
5787    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5788    let mut drop_ids = Vec::new();
5789    let mut privilege_revokes = Vec::new();
5790    let mut default_privilege_revokes = Vec::new();
5791
5792    fn update_privilege_revokes(
5793        object_id: SystemObjectId,
5794        privileges: &PrivilegeMap,
5795        role_ids: &BTreeSet<RoleId>,
5796        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5797    ) {
5798        privilege_revokes.extend(iter::zip(
5799            iter::repeat(object_id),
5800            privileges
5801                .all_values()
5802                .filter(|privilege| role_ids.contains(&privilege.grantee))
5803                .cloned(),
5804        ));
5805    }
5806
5807    // Replicas
5808    for replica in scx.catalog.get_cluster_replicas() {
5809        if role_ids.contains(&replica.owner_id()) {
5810            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5811        }
5812    }
5813
5814    // Clusters
5815    for cluster in scx.catalog.get_clusters() {
5816        if role_ids.contains(&cluster.owner_id()) {
5817            // Note: CASCADE is not required for replicas.
5818            if !cascade {
5819                let non_owned_bound_objects: Vec<_> = cluster
5820                    .bound_objects()
5821                    .into_iter()
5822                    .map(|item_id| scx.catalog.get_item(item_id))
5823                    .filter(|item| !role_ids.contains(&item.owner_id()))
5824                    .collect();
5825                if !non_owned_bound_objects.is_empty() {
5826                    let names: Vec<_> = non_owned_bound_objects
5827                        .into_iter()
5828                        .map(|item| {
5829                            (
5830                                item.item_type().to_string(),
5831                                scx.catalog.resolve_full_name(item.name()).to_string(),
5832                            )
5833                        })
5834                        .collect();
5835                    return Err(PlanError::DependentObjectsStillExist {
5836                        object_type: "cluster".to_string(),
5837                        object_name: cluster.name().to_string(),
5838                        dependents: names,
5839                    });
5840                }
5841            }
5842            drop_ids.push(cluster.id().into());
5843        }
5844        update_privilege_revokes(
5845            SystemObjectId::Object(cluster.id().into()),
5846            cluster.privileges(),
5847            &role_ids,
5848            &mut privilege_revokes,
5849        );
5850    }
5851
5852    // Items
5853    for item in scx.catalog.get_items() {
5854        if role_ids.contains(&item.owner_id()) {
5855            if !cascade {
5856                // Checks if any items still depend on this one, returning an error if so.
5857                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5858                    let non_owned_dependencies: Vec<_> = used_by
5859                        .into_iter()
5860                        .map(|item_id| scx.catalog.get_item(item_id))
5861                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5862                        .filter(|item| !role_ids.contains(&item.owner_id()))
5863                        .collect();
5864                    if !non_owned_dependencies.is_empty() {
5865                        let names: Vec<_> = non_owned_dependencies
5866                            .into_iter()
5867                            .map(|item| {
5868                                let item_typ = item.item_type().to_string();
5869                                let item_name =
5870                                    scx.catalog.resolve_full_name(item.name()).to_string();
5871                                (item_typ, item_name)
5872                            })
5873                            .collect();
5874                        Err(PlanError::DependentObjectsStillExist {
5875                            object_type: item.item_type().to_string(),
5876                            object_name: scx
5877                                .catalog
5878                                .resolve_full_name(item.name())
5879                                .to_string()
5880                                .to_string(),
5881                            dependents: names,
5882                        })
5883                    } else {
5884                        Ok(())
5885                    }
5886                };
5887
5888                // When this item gets dropped it will also drop its progress source, so we need to
5889                // check the users of those.
5890                if let Some(id) = item.progress_id() {
5891                    let progress_item = scx.catalog.get_item(&id);
5892                    check_if_dependents_exist(progress_item.used_by())?;
5893                }
5894                check_if_dependents_exist(item.used_by())?;
5895            }
5896            drop_ids.push(item.id().into());
5897        }
5898        update_privilege_revokes(
5899            SystemObjectId::Object(item.id().into()),
5900            item.privileges(),
5901            &role_ids,
5902            &mut privilege_revokes,
5903        );
5904    }
5905
5906    // Schemas
5907    for schema in scx.catalog.get_schemas() {
5908        if !schema.id().is_temporary() {
5909            if role_ids.contains(&schema.owner_id()) {
5910                if !cascade {
5911                    let non_owned_dependencies: Vec<_> = schema
5912                        .item_ids()
5913                        .map(|item_id| scx.catalog.get_item(&item_id))
5914                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5915                        .filter(|item| !role_ids.contains(&item.owner_id()))
5916                        .collect();
5917                    if !non_owned_dependencies.is_empty() {
5918                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5919                        sql_bail!(
5920                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5921                            full_schema_name.to_string().quoted()
5922                        );
5923                    }
5924                }
5925                drop_ids.push((*schema.database(), *schema.id()).into())
5926            }
5927            update_privilege_revokes(
5928                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5929                schema.privileges(),
5930                &role_ids,
5931                &mut privilege_revokes,
5932            );
5933        }
5934    }
5935
5936    // Databases
5937    for database in scx.catalog.get_databases() {
5938        if role_ids.contains(&database.owner_id()) {
5939            if !cascade {
5940                let non_owned_schemas: Vec<_> = database
5941                    .schemas()
5942                    .into_iter()
5943                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5944                    .collect();
5945                if !non_owned_schemas.is_empty() {
5946                    sql_bail!(
5947                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5948                        database.name().quoted(),
5949                    );
5950                }
5951            }
5952            drop_ids.push(database.id().into());
5953        }
5954        update_privilege_revokes(
5955            SystemObjectId::Object(database.id().into()),
5956            database.privileges(),
5957            &role_ids,
5958            &mut privilege_revokes,
5959        );
5960    }
5961
5962    // System
5963    update_privilege_revokes(
5964        SystemObjectId::System,
5965        scx.catalog.get_system_privileges(),
5966        &role_ids,
5967        &mut privilege_revokes,
5968    );
5969
5970    for (default_privilege_object, default_privilege_acl_items) in
5971        scx.catalog.get_default_privileges()
5972    {
5973        for default_privilege_acl_item in default_privilege_acl_items {
5974            if role_ids.contains(&default_privilege_object.role_id)
5975                || role_ids.contains(&default_privilege_acl_item.grantee)
5976            {
5977                default_privilege_revokes.push((
5978                    default_privilege_object.clone(),
5979                    default_privilege_acl_item.clone(),
5980                ));
5981            }
5982        }
5983    }
5984
5985    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5986
5987    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5988    if !system_ids.is_empty() {
5989        let mut owners = system_ids
5990            .into_iter()
5991            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5992            .collect::<BTreeSet<_>>()
5993            .into_iter()
5994            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5995        sql_bail!(
5996            "cannot drop objects owned by role {} because they are required by the database system",
5997            owners.join(", "),
5998        );
5999    }
6000
6001    Ok(Plan::DropOwned(DropOwnedPlan {
6002        role_ids: role_ids.into_iter().collect(),
6003        drop_ids,
6004        privilege_revokes,
6005        default_privilege_revokes,
6006    }))
6007}
6008
6009fn plan_retain_history_option(
6010    scx: &StatementContext,
6011    retain_history: Option<OptionalDuration>,
6012) -> Result<Option<CompactionWindow>, PlanError> {
6013    if let Some(OptionalDuration(lcw)) = retain_history {
6014        Ok(Some(plan_retain_history(scx, lcw)?))
6015    } else {
6016        Ok(None)
6017    }
6018}
6019
6020// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
6021// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
6022// already converts the zero duration into `None`. This function must not be called in the `RESET
6023// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
6024// `None`.
6025fn plan_retain_history(
6026    scx: &StatementContext,
6027    lcw: Option<Duration>,
6028) -> Result<CompactionWindow, PlanError> {
6029    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6030    match lcw {
6031        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
6032        // disable compaction), and should never occur here. Furthermore, some things actually do
6033        // break when this is set to real zero:
6034        // https://github.com/MaterializeInc/database-issues/issues/3798.
6035        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6036            option_name: "RETAIN HISTORY".to_string(),
6037            err: Box::new(PlanError::Unstructured(
6038                "internal error: unexpectedly zero".to_string(),
6039            )),
6040        }),
6041        Some(duration) => {
6042            // Error if the duration is low and enable_unlimited_retain_history is not set (which
6043            // should only be possible during testing).
6044            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6045                && scx
6046                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6047                    .is_err()
6048            {
6049                return Err(PlanError::RetainHistoryLow {
6050                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6051                });
6052            }
6053            Ok(duration.try_into()?)
6054        }
6055        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
6056        // to be a bad choice, so prevent it.
6057        None => {
6058            if scx
6059                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6060                .is_err()
6061            {
6062                Err(PlanError::RetainHistoryRequired)
6063            } else {
6064                Ok(CompactionWindow::DisableCompaction)
6065            }
6066        }
6067    }
6068}
6069
6070generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6071
6072fn plan_index_options(
6073    scx: &StatementContext,
6074    with_opts: Vec<IndexOption<Aug>>,
6075) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6076    if !with_opts.is_empty() {
6077        // Index options are not durable.
6078        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6079    }
6080
6081    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6082
6083    let mut out = Vec::with_capacity(1);
6084    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6085        out.push(crate::plan::IndexOption::RetainHistory(cw));
6086    }
6087    Ok(out)
6088}
6089
6090generate_extracted_config!(
6091    TableOption,
6092    (PartitionBy, Vec<Ident>),
6093    (RetainHistory, OptionalDuration),
6094    (RedactedTest, String)
6095);
6096
6097fn plan_table_options(
6098    scx: &StatementContext,
6099    desc: &RelationDesc,
6100    with_opts: Vec<TableOption<Aug>>,
6101) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6102    let TableOptionExtracted {
6103        partition_by,
6104        retain_history,
6105        redacted_test,
6106        ..
6107    }: TableOptionExtracted = with_opts.try_into()?;
6108
6109    if let Some(partition_by) = partition_by {
6110        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6111        check_partition_by(desc, partition_by)?;
6112    }
6113
6114    if redacted_test.is_some() {
6115        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6116    }
6117
6118    let mut out = Vec::with_capacity(1);
6119    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6120        out.push(crate::plan::TableOption::RetainHistory(cw));
6121    }
6122    Ok(out)
6123}
6124
6125pub fn plan_alter_index_options(
6126    scx: &mut StatementContext,
6127    AlterIndexStatement {
6128        index_name,
6129        if_exists,
6130        action,
6131    }: AlterIndexStatement<Aug>,
6132) -> Result<Plan, PlanError> {
6133    let object_type = ObjectType::Index;
6134    match action {
6135        AlterIndexAction::ResetOptions(options) => {
6136            let mut options = options.into_iter();
6137            if let Some(opt) = options.next() {
6138                match opt {
6139                    IndexOptionName::RetainHistory => {
6140                        if options.next().is_some() {
6141                            sql_bail!("RETAIN HISTORY must be only option");
6142                        }
6143                        return alter_retain_history(
6144                            scx,
6145                            object_type,
6146                            if_exists,
6147                            UnresolvedObjectName::Item(index_name),
6148                            None,
6149                        );
6150                    }
6151                }
6152            }
6153            sql_bail!("expected option");
6154        }
6155        AlterIndexAction::SetOptions(options) => {
6156            let mut options = options.into_iter();
6157            if let Some(opt) = options.next() {
6158                match opt.name {
6159                    IndexOptionName::RetainHistory => {
6160                        if options.next().is_some() {
6161                            sql_bail!("RETAIN HISTORY must be only option");
6162                        }
6163                        return alter_retain_history(
6164                            scx,
6165                            object_type,
6166                            if_exists,
6167                            UnresolvedObjectName::Item(index_name),
6168                            opt.value,
6169                        );
6170                    }
6171                }
6172            }
6173            sql_bail!("expected option");
6174        }
6175    }
6176}
6177
6178pub fn describe_alter_cluster_set_options(
6179    _: &StatementContext,
6180    _: AlterClusterStatement<Aug>,
6181) -> Result<StatementDesc, PlanError> {
6182    Ok(StatementDesc::new(None))
6183}
6184
6185pub fn plan_alter_cluster(
6186    scx: &mut StatementContext,
6187    AlterClusterStatement {
6188        name,
6189        action,
6190        if_exists,
6191    }: AlterClusterStatement<Aug>,
6192) -> Result<Plan, PlanError> {
6193    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6194        Some(entry) => entry,
6195        None => {
6196            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6197                name: name.to_ast_string_simple(),
6198                object_type: ObjectType::Cluster,
6199            });
6200
6201            return Ok(Plan::AlterNoop(AlterNoopPlan {
6202                object_type: ObjectType::Cluster,
6203            }));
6204        }
6205    };
6206
6207    let mut options: PlanClusterOption = Default::default();
6208    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6209
6210    match action {
6211        AlterClusterAction::SetOptions {
6212            options: set_options,
6213            with_options,
6214        } => {
6215            let ClusterOptionExtracted {
6216                availability_zones,
6217                introspection_debugging,
6218                introspection_interval,
6219                managed,
6220                replicas: replica_defs,
6221                replication_factor,
6222                seen: _,
6223                size,
6224                disk,
6225                schedule,
6226                workload_class,
6227            }: ClusterOptionExtracted = set_options.try_into()?;
6228
6229            if !scx.catalog.active_role_id().is_system() {
6230                if workload_class.is_some() {
6231                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6232                }
6233            }
6234
6235            match managed.unwrap_or_else(|| cluster.is_managed()) {
6236                true => {
6237                    let alter_strategy_extracted =
6238                        ClusterAlterOptionExtracted::try_from(with_options)?;
6239                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6240
6241                    match alter_strategy {
6242                        AlterClusterPlanStrategy::None => {}
6243                        _ => {
6244                            scx.require_feature_flag(
6245                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6246                            )?;
6247                        }
6248                    }
6249
6250                    if replica_defs.is_some() {
6251                        sql_bail!("REPLICAS not supported for managed clusters");
6252                    }
6253                    if schedule.is_some()
6254                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6255                    {
6256                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6257                    }
6258
6259                    if let Some(replication_factor) = replication_factor {
6260                        if schedule.is_some()
6261                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6262                        {
6263                            sql_bail!(
6264                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6265                            );
6266                        }
6267                        if let Some(current_schedule) = cluster.schedule() {
6268                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6269                                sql_bail!(
6270                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6271                                );
6272                            }
6273                        }
6274
6275                        let internal_replica_count =
6276                            cluster.replicas().iter().filter(|r| r.internal()).count();
6277                        let hypothetical_replica_count =
6278                            internal_replica_count + usize::cast_from(replication_factor);
6279
6280                        // Total number of replicas running is internal replicas
6281                        // + replication factor.
6282                        if contains_single_replica_objects(scx, cluster)
6283                            && hypothetical_replica_count > 1
6284                        {
6285                            return Err(PlanError::CreateReplicaFailStorageObjects {
6286                                current_replica_count: cluster.replica_ids().iter().count(),
6287                                internal_replica_count,
6288                                hypothetical_replica_count,
6289                            });
6290                        }
6291                    } else if alter_strategy.is_some() {
6292                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6293                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6294                        // just fail.
6295                        let internal_replica_count =
6296                            cluster.replicas().iter().filter(|r| r.internal()).count();
6297                        let hypothetical_replica_count = internal_replica_count * 2;
6298                        if contains_single_replica_objects(scx, cluster) {
6299                            return Err(PlanError::CreateReplicaFailStorageObjects {
6300                                current_replica_count: cluster.replica_ids().iter().count(),
6301                                internal_replica_count,
6302                                hypothetical_replica_count,
6303                            });
6304                        }
6305                    }
6306                }
6307                false => {
6308                    if !alter_strategy.is_none() {
6309                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6310                    }
6311                    if availability_zones.is_some() {
6312                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6313                    }
6314                    if replication_factor.is_some() {
6315                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6316                    }
6317                    if introspection_debugging.is_some() {
6318                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6319                    }
6320                    if introspection_interval.is_some() {
6321                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6322                    }
6323                    if size.is_some() {
6324                        sql_bail!("SIZE not supported for unmanaged clusters");
6325                    }
6326                    if disk.is_some() {
6327                        sql_bail!("DISK not supported for unmanaged clusters");
6328                    }
6329                    if schedule.is_some()
6330                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6331                    {
6332                        sql_bail!(
6333                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6334                        );
6335                    }
6336                    if let Some(current_schedule) = cluster.schedule() {
6337                        if !matches!(current_schedule, ClusterSchedule::Manual)
6338                            && schedule.is_none()
6339                        {
6340                            sql_bail!(
6341                                "when switching a cluster to unmanaged, if the managed \
6342                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6343                                explicitly set the SCHEDULE to MANUAL"
6344                            );
6345                        }
6346                    }
6347                }
6348            }
6349
6350            let mut replicas = vec![];
6351            for ReplicaDefinition { name, options } in
6352                replica_defs.into_iter().flat_map(Vec::into_iter)
6353            {
6354                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6355            }
6356
6357            if let Some(managed) = managed {
6358                options.managed = AlterOptionParameter::Set(managed);
6359            }
6360            if let Some(replication_factor) = replication_factor {
6361                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6362            }
6363            if let Some(size) = &size {
6364                options.size = AlterOptionParameter::Set(size.clone());
6365            }
6366            if let Some(availability_zones) = availability_zones {
6367                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6368            }
6369            if let Some(introspection_debugging) = introspection_debugging {
6370                options.introspection_debugging =
6371                    AlterOptionParameter::Set(introspection_debugging);
6372            }
6373            if let Some(introspection_interval) = introspection_interval {
6374                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6375            }
6376            if disk.is_some() {
6377                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6378                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6379                // we'll be able to remove the `DISK` option entirely.
6380                let size = size.as_deref().unwrap_or_else(|| {
6381                    cluster.managed_size().expect("cluster known to be managed")
6382                });
6383                if scx.catalog.is_cluster_size_cc(size) {
6384                    sql_bail!(
6385                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6386                    );
6387                }
6388
6389                scx.catalog
6390                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6391            }
6392            if !replicas.is_empty() {
6393                options.replicas = AlterOptionParameter::Set(replicas);
6394            }
6395            if let Some(schedule) = schedule {
6396                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6397            }
6398            if let Some(workload_class) = workload_class {
6399                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6400            }
6401        }
6402        AlterClusterAction::ResetOptions(reset_options) => {
6403            use AlterOptionParameter::Reset;
6404            use ClusterOptionName::*;
6405
6406            if !scx.catalog.active_role_id().is_system() {
6407                if reset_options.contains(&WorkloadClass) {
6408                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6409                }
6410            }
6411
6412            for option in reset_options {
6413                match option {
6414                    AvailabilityZones => options.availability_zones = Reset,
6415                    Disk => scx
6416                        .catalog
6417                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6418                    IntrospectionInterval => options.introspection_interval = Reset,
6419                    IntrospectionDebugging => options.introspection_debugging = Reset,
6420                    Managed => options.managed = Reset,
6421                    Replicas => options.replicas = Reset,
6422                    ReplicationFactor => options.replication_factor = Reset,
6423                    Size => options.size = Reset,
6424                    Schedule => options.schedule = Reset,
6425                    WorkloadClass => options.workload_class = Reset,
6426                }
6427            }
6428        }
6429    }
6430    Ok(Plan::AlterCluster(AlterClusterPlan {
6431        id: cluster.id(),
6432        name: cluster.name().to_string(),
6433        options,
6434        strategy: alter_strategy,
6435    }))
6436}
6437
6438pub fn describe_alter_set_cluster(
6439    _: &StatementContext,
6440    _: AlterSetClusterStatement<Aug>,
6441) -> Result<StatementDesc, PlanError> {
6442    Ok(StatementDesc::new(None))
6443}
6444
6445pub fn plan_alter_item_set_cluster(
6446    scx: &StatementContext,
6447    AlterSetClusterStatement {
6448        if_exists,
6449        set_cluster: in_cluster_name,
6450        name,
6451        object_type,
6452    }: AlterSetClusterStatement<Aug>,
6453) -> Result<Plan, PlanError> {
6454    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6455
6456    let object_type = object_type.into();
6457
6458    // Prevent access to `SET CLUSTER` for unsupported objects.
6459    match object_type {
6460        ObjectType::MaterializedView => {}
6461        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6462            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6463        }
6464        _ => {
6465            bail_never_supported!(
6466                format!("ALTER {object_type} SET CLUSTER"),
6467                "sql/alter-set-cluster/",
6468                format!("{object_type} has no associated cluster")
6469            )
6470        }
6471    }
6472
6473    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6474
6475    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6476        Some(entry) => {
6477            let current_cluster = entry.cluster_id();
6478            let Some(current_cluster) = current_cluster else {
6479                sql_bail!("No cluster associated with {name}");
6480            };
6481
6482            if current_cluster == in_cluster.id() {
6483                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6484            } else {
6485                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6486                    id: entry.id(),
6487                    set_cluster: in_cluster.id(),
6488                }))
6489            }
6490        }
6491        None => {
6492            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6493                name: name.to_ast_string_simple(),
6494                object_type,
6495            });
6496
6497            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6498        }
6499    }
6500}
6501
6502pub fn describe_alter_object_rename(
6503    _: &StatementContext,
6504    _: AlterObjectRenameStatement,
6505) -> Result<StatementDesc, PlanError> {
6506    Ok(StatementDesc::new(None))
6507}
6508
6509pub fn plan_alter_object_rename(
6510    scx: &mut StatementContext,
6511    AlterObjectRenameStatement {
6512        name,
6513        object_type,
6514        to_item_name,
6515        if_exists,
6516    }: AlterObjectRenameStatement,
6517) -> Result<Plan, PlanError> {
6518    let object_type = object_type.into();
6519    match (object_type, name) {
6520        (
6521            ObjectType::View
6522            | ObjectType::MaterializedView
6523            | ObjectType::Table
6524            | ObjectType::Source
6525            | ObjectType::Index
6526            | ObjectType::Sink
6527            | ObjectType::Secret
6528            | ObjectType::Connection,
6529            UnresolvedObjectName::Item(name),
6530        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6531        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6532            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6533        }
6534        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6535            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6536        }
6537        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6538            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6539        }
6540        (object_type, name) => {
6541            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6542        }
6543    }
6544}
6545
6546pub fn plan_alter_schema_rename(
6547    scx: &mut StatementContext,
6548    name: UnresolvedSchemaName,
6549    to_schema_name: Ident,
6550    if_exists: bool,
6551) -> Result<Plan, PlanError> {
6552    // Special case for mz_temp: with lazy temporary schema creation, the temp
6553    // schema may not exist yet, but we still need to return the correct error.
6554    // Check the schema name directly against MZ_TEMP_SCHEMA.
6555    let normalized = normalize::unresolved_schema_name(name.clone())?;
6556    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6557        sql_bail!(
6558            "cannot rename schemas in the ambient database: {:?}",
6559            mz_repr::namespaces::MZ_TEMP_SCHEMA
6560        );
6561    }
6562
6563    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6564        let object_type = ObjectType::Schema;
6565        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6566            name: name.to_ast_string_simple(),
6567            object_type,
6568        });
6569        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6570    };
6571
6572    // Make sure the name is unique.
6573    if scx
6574        .resolve_schema_in_database(&db_spec, &to_schema_name)
6575        .is_ok()
6576    {
6577        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6578            to_schema_name.clone().into_string(),
6579        )));
6580    }
6581
6582    // Prevent users from renaming system related schemas.
6583    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6584    if schema.id().is_system() {
6585        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6586    }
6587
6588    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6589        cur_schema_spec: (db_spec, schema_spec),
6590        new_schema_name: to_schema_name.into_string(),
6591    }))
6592}
6593
6594pub fn plan_alter_schema_swap<F>(
6595    scx: &mut StatementContext,
6596    name_a: UnresolvedSchemaName,
6597    name_b: Ident,
6598    gen_temp_suffix: F,
6599) -> Result<Plan, PlanError>
6600where
6601    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6602{
6603    // Special case for mz_temp: with lazy temporary schema creation, the temp
6604    // schema may not exist yet, but we still need to return the correct error.
6605    // Check the schema name directly against MZ_TEMP_SCHEMA.
6606    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6607    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6608    {
6609        sql_bail!("cannot swap schemas that are in the ambient database");
6610    }
6611    // Also check name_b (the target schema name)
6612    let name_b_str = normalize::ident_ref(&name_b);
6613    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6614        sql_bail!("cannot swap schemas that are in the ambient database");
6615    }
6616
6617    let schema_a = scx.resolve_schema(name_a.clone())?;
6618
6619    let db_spec = schema_a.database().clone();
6620    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6621        sql_bail!("cannot swap schemas that are in the ambient database");
6622    };
6623    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6624
6625    // We cannot swap system schemas.
6626    if schema_a.id().is_system() || schema_b.id().is_system() {
6627        bail_never_supported!("swapping a system schema".to_string())
6628    }
6629
6630    // Generate a temporary name we can swap schema_a to.
6631    //
6632    // 'check' returns if the temp schema name would be valid.
6633    let check = |temp_suffix: &str| {
6634        let mut temp_name = ident!("mz_schema_swap_");
6635        temp_name.append_lossy(temp_suffix);
6636        scx.resolve_schema_in_database(&db_spec, &temp_name)
6637            .is_err()
6638    };
6639    let temp_suffix = gen_temp_suffix(&check)?;
6640    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6641
6642    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6643        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6644        schema_a_name: schema_a.name().schema.to_string(),
6645        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6646        schema_b_name: schema_b.name().schema.to_string(),
6647        name_temp,
6648    }))
6649}
6650
6651pub fn plan_alter_item_rename(
6652    scx: &mut StatementContext,
6653    object_type: ObjectType,
6654    name: UnresolvedItemName,
6655    to_item_name: Ident,
6656    if_exists: bool,
6657) -> Result<Plan, PlanError> {
6658    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6659        Ok(r) => r,
6660        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6661        Err(PlanError::MismatchedObjectType {
6662            name,
6663            is_type: ObjectType::MaterializedView,
6664            expected_type: ObjectType::View,
6665        }) => {
6666            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6667        }
6668        e => e?,
6669    };
6670
6671    match resolved {
6672        Some(entry) => {
6673            let full_name = scx.catalog.resolve_full_name(entry.name());
6674            let item_type = entry.item_type();
6675
6676            let proposed_name = QualifiedItemName {
6677                qualifiers: entry.name().qualifiers.clone(),
6678                item: to_item_name.clone().into_string(),
6679            };
6680
6681            // For PostgreSQL compatibility, items and types cannot have
6682            // overlapping names in a variety of situations. See the comment on
6683            // `CatalogItemType::conflicts_with_type` for details.
6684            let conflicting_type_exists;
6685            let conflicting_item_exists;
6686            if item_type == CatalogItemType::Type {
6687                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6688                conflicting_item_exists = scx
6689                    .catalog
6690                    .get_item_by_name(&proposed_name)
6691                    .map(|item| item.item_type().conflicts_with_type())
6692                    .unwrap_or(false);
6693            } else {
6694                conflicting_type_exists = item_type.conflicts_with_type()
6695                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6696                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6697            };
6698            if conflicting_type_exists || conflicting_item_exists {
6699                sql_bail!("catalog item '{}' already exists", to_item_name);
6700            }
6701
6702            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6703                id: entry.id(),
6704                current_full_name: full_name,
6705                to_name: normalize::ident(to_item_name),
6706                object_type,
6707            }))
6708        }
6709        None => {
6710            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6711                name: name.to_ast_string_simple(),
6712                object_type,
6713            });
6714
6715            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6716        }
6717    }
6718}
6719
6720pub fn plan_alter_cluster_rename(
6721    scx: &mut StatementContext,
6722    object_type: ObjectType,
6723    name: Ident,
6724    to_name: Ident,
6725    if_exists: bool,
6726) -> Result<Plan, PlanError> {
6727    match resolve_cluster(scx, &name, if_exists)? {
6728        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6729            id: entry.id(),
6730            name: entry.name().to_string(),
6731            to_name: ident(to_name),
6732        })),
6733        None => {
6734            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6735                name: name.to_ast_string_simple(),
6736                object_type,
6737            });
6738
6739            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6740        }
6741    }
6742}
6743
6744pub fn plan_alter_cluster_swap<F>(
6745    scx: &mut StatementContext,
6746    name_a: Ident,
6747    name_b: Ident,
6748    gen_temp_suffix: F,
6749) -> Result<Plan, PlanError>
6750where
6751    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6752{
6753    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6754    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6755
6756    let check = |temp_suffix: &str| {
6757        let mut temp_name = ident!("mz_schema_swap_");
6758        temp_name.append_lossy(temp_suffix);
6759        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6760            // Temp name does not exist, so we can use it.
6761            Err(CatalogError::UnknownCluster(_)) => true,
6762            // Temp name already exists!
6763            Ok(_) | Err(_) => false,
6764        }
6765    };
6766    let temp_suffix = gen_temp_suffix(&check)?;
6767    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6768
6769    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6770        id_a: cluster_a.id(),
6771        id_b: cluster_b.id(),
6772        name_a: name_a.into_string(),
6773        name_b: name_b.into_string(),
6774        name_temp,
6775    }))
6776}
6777
6778pub fn plan_alter_cluster_replica_rename(
6779    scx: &mut StatementContext,
6780    object_type: ObjectType,
6781    name: QualifiedReplica,
6782    to_item_name: Ident,
6783    if_exists: bool,
6784) -> Result<Plan, PlanError> {
6785    match resolve_cluster_replica(scx, &name, if_exists)? {
6786        Some((cluster, replica)) => {
6787            ensure_cluster_is_not_managed(scx, cluster.id())?;
6788            Ok(Plan::AlterClusterReplicaRename(
6789                AlterClusterReplicaRenamePlan {
6790                    cluster_id: cluster.id(),
6791                    replica_id: replica,
6792                    name: QualifiedReplica {
6793                        cluster: Ident::new(cluster.name())?,
6794                        replica: name.replica,
6795                    },
6796                    to_name: normalize::ident(to_item_name),
6797                },
6798            ))
6799        }
6800        None => {
6801            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6802                name: name.to_ast_string_simple(),
6803                object_type,
6804            });
6805
6806            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6807        }
6808    }
6809}
6810
6811pub fn describe_alter_object_swap(
6812    _: &StatementContext,
6813    _: AlterObjectSwapStatement,
6814) -> Result<StatementDesc, PlanError> {
6815    Ok(StatementDesc::new(None))
6816}
6817
6818pub fn plan_alter_object_swap(
6819    scx: &mut StatementContext,
6820    stmt: AlterObjectSwapStatement,
6821) -> Result<Plan, PlanError> {
6822    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6823
6824    let AlterObjectSwapStatement {
6825        object_type,
6826        name_a,
6827        name_b,
6828    } = stmt;
6829    let object_type = object_type.into();
6830
6831    // We'll try 10 times to generate a temporary suffix.
6832    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6833        let mut attempts = 0;
6834        let name_temp = loop {
6835            attempts += 1;
6836            if attempts > 10 {
6837                tracing::warn!("Unable to generate temp id for swapping");
6838                sql_bail!("unable to swap!");
6839            }
6840
6841            // Call the provided closure to make sure this name is unique!
6842            let short_id = mz_ore::id_gen::temp_id();
6843            if check_fn(&short_id) {
6844                break short_id;
6845            }
6846        };
6847
6848        Ok(name_temp)
6849    };
6850
6851    match (object_type, name_a, name_b) {
6852        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6853            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6854        }
6855        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6856            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6857        }
6858        (object_type, _, _) => Err(PlanError::Unsupported {
6859            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6860            discussion_no: None,
6861        }),
6862    }
6863}
6864
6865pub fn describe_alter_retain_history(
6866    _: &StatementContext,
6867    _: AlterRetainHistoryStatement<Aug>,
6868) -> Result<StatementDesc, PlanError> {
6869    Ok(StatementDesc::new(None))
6870}
6871
6872pub fn plan_alter_retain_history(
6873    scx: &StatementContext,
6874    AlterRetainHistoryStatement {
6875        object_type,
6876        if_exists,
6877        name,
6878        history,
6879    }: AlterRetainHistoryStatement<Aug>,
6880) -> Result<Plan, PlanError> {
6881    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6882}
6883
6884fn alter_retain_history(
6885    scx: &StatementContext,
6886    object_type: ObjectType,
6887    if_exists: bool,
6888    name: UnresolvedObjectName,
6889    history: Option<WithOptionValue<Aug>>,
6890) -> Result<Plan, PlanError> {
6891    let name = match (object_type, name) {
6892        (
6893            // View gets a special error below.
6894            ObjectType::View
6895            | ObjectType::MaterializedView
6896            | ObjectType::Table
6897            | ObjectType::Source
6898            | ObjectType::Index,
6899            UnresolvedObjectName::Item(name),
6900        ) => name,
6901        (object_type, _) => {
6902            sql_bail!("{object_type} does not support RETAIN HISTORY")
6903        }
6904    };
6905    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6906        Some(entry) => {
6907            let full_name = scx.catalog.resolve_full_name(entry.name());
6908            let item_type = entry.item_type();
6909
6910            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6911            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6912                return Err(PlanError::AlterViewOnMaterializedView(
6913                    full_name.to_string(),
6914                ));
6915            } else if object_type == ObjectType::View {
6916                sql_bail!("{object_type} does not support RETAIN HISTORY")
6917            } else if object_type != item_type {
6918                sql_bail!(
6919                    "\"{}\" is a {} not a {}",
6920                    full_name,
6921                    entry.item_type(),
6922                    format!("{object_type}").to_lowercase()
6923                )
6924            }
6925
6926            // Save the original value so we can write it back down in the create_sql catalog item.
6927            let (value, lcw) = match &history {
6928                Some(WithOptionValue::RetainHistoryFor(value)) => {
6929                    let window = OptionalDuration::try_from_value(value.clone())?;
6930                    (Some(value.clone()), window.0)
6931                }
6932                // None is RESET, so use the default CW.
6933                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6934                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6935            };
6936            let window = plan_retain_history(scx, lcw)?;
6937
6938            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6939                id: entry.id(),
6940                value,
6941                window,
6942                object_type,
6943            }))
6944        }
6945        None => {
6946            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6947                name: name.to_ast_string_simple(),
6948                object_type,
6949            });
6950
6951            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6952        }
6953    }
6954}
6955
6956pub fn describe_alter_secret_options(
6957    _: &StatementContext,
6958    _: AlterSecretStatement<Aug>,
6959) -> Result<StatementDesc, PlanError> {
6960    Ok(StatementDesc::new(None))
6961}
6962
6963pub fn plan_alter_secret(
6964    scx: &mut StatementContext,
6965    stmt: AlterSecretStatement<Aug>,
6966) -> Result<Plan, PlanError> {
6967    let AlterSecretStatement {
6968        name,
6969        if_exists,
6970        value,
6971    } = stmt;
6972    let object_type = ObjectType::Secret;
6973    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6974        Some(entry) => entry.id(),
6975        None => {
6976            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6977                name: name.to_string(),
6978                object_type,
6979            });
6980
6981            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6982        }
6983    };
6984
6985    let secret_as = query::plan_secret_as(scx, value)?;
6986
6987    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6988}
6989
6990pub fn describe_alter_connection(
6991    _: &StatementContext,
6992    _: AlterConnectionStatement<Aug>,
6993) -> Result<StatementDesc, PlanError> {
6994    Ok(StatementDesc::new(None))
6995}
6996
6997generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6998
6999pub fn plan_alter_connection(
7000    scx: &StatementContext,
7001    stmt: AlterConnectionStatement<Aug>,
7002) -> Result<Plan, PlanError> {
7003    let AlterConnectionStatement {
7004        name,
7005        if_exists,
7006        actions,
7007        with_options,
7008    } = stmt;
7009    let conn_name = normalize::unresolved_item_name(name)?;
7010    let entry = match scx.catalog.resolve_item(&conn_name) {
7011        Ok(entry) => entry,
7012        Err(_) if if_exists => {
7013            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7014                name: conn_name.to_string(),
7015                object_type: ObjectType::Sink,
7016            });
7017
7018            return Ok(Plan::AlterNoop(AlterNoopPlan {
7019                object_type: ObjectType::Connection,
7020            }));
7021        }
7022        Err(e) => return Err(e.into()),
7023    };
7024
7025    let connection = entry.connection()?;
7026
7027    if actions
7028        .iter()
7029        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7030    {
7031        if actions.len() > 1 {
7032            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7033        }
7034
7035        if !with_options.is_empty() {
7036            sql_bail!(
7037                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7038                with_options
7039                    .iter()
7040                    .map(|o| o.to_ast_string_simple())
7041                    .join(", ")
7042            );
7043        }
7044
7045        if !matches!(connection, Connection::Ssh(_)) {
7046            sql_bail!(
7047                "{} is not an SSH connection",
7048                scx.catalog.resolve_full_name(entry.name())
7049            )
7050        }
7051
7052        return Ok(Plan::AlterConnection(AlterConnectionPlan {
7053            id: entry.id(),
7054            action: crate::plan::AlterConnectionAction::RotateKeys,
7055        }));
7056    }
7057
7058    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7059    if options.validate.is_some() {
7060        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7061    }
7062
7063    let validate = match options.validate {
7064        Some(val) => val,
7065        None => {
7066            scx.catalog
7067                .system_vars()
7068                .enable_default_connection_validation()
7069                && connection.validate_by_default()
7070        }
7071    };
7072
7073    let connection_type = match connection {
7074        Connection::Aws(_) => CreateConnectionType::Aws,
7075        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7076        Connection::Kafka(_) => CreateConnectionType::Kafka,
7077        Connection::Csr(_) => CreateConnectionType::Csr,
7078        Connection::Postgres(_) => CreateConnectionType::Postgres,
7079        Connection::Ssh(_) => CreateConnectionType::Ssh,
7080        Connection::MySql(_) => CreateConnectionType::MySql,
7081        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7082        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7083    };
7084
7085    // Collect all options irrespective of action taken on them.
7086    let specified_options: BTreeSet<_> = actions
7087        .iter()
7088        .map(|action: &AlterConnectionAction<Aug>| match action {
7089            AlterConnectionAction::SetOption(option) => option.name.clone(),
7090            AlterConnectionAction::DropOption(name) => name.clone(),
7091            AlterConnectionAction::RotateKeys => unreachable!(),
7092        })
7093        .collect();
7094
7095    for invalid in INALTERABLE_OPTIONS {
7096        if specified_options.contains(invalid) {
7097            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7098        }
7099    }
7100
7101    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7102
7103    // Partition operations into set and drop
7104    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7105        actions.into_iter().partition_map(|action| match action {
7106            AlterConnectionAction::SetOption(option) => Either::Left(option),
7107            AlterConnectionAction::DropOption(name) => Either::Right(name),
7108            AlterConnectionAction::RotateKeys => unreachable!(),
7109        });
7110
7111    let set_options: BTreeMap<_, _> = set_options_vec
7112        .clone()
7113        .into_iter()
7114        .map(|option| (option.name, option.value))
7115        .collect();
7116
7117    // Type check values + avoid duplicates; we don't want to e.g. let users
7118    // drop and set the same option in the same statement, so treating drops as
7119    // sets here is fine.
7120    let connection_options_extracted =
7121        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7122
7123    let duplicates: Vec<_> = connection_options_extracted
7124        .seen
7125        .intersection(&drop_options)
7126        .collect();
7127
7128    if !duplicates.is_empty() {
7129        sql_bail!(
7130            "cannot both SET and DROP/RESET options {}",
7131            duplicates
7132                .iter()
7133                .map(|option| option.to_string())
7134                .join(", ")
7135        )
7136    }
7137
7138    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7139        let set_options_count = mutually_exclusive_options
7140            .iter()
7141            .filter(|o| set_options.contains_key(o))
7142            .count();
7143        let drop_options_count = mutually_exclusive_options
7144            .iter()
7145            .filter(|o| drop_options.contains(o))
7146            .count();
7147
7148        // Disallow setting _and_ resetting mutually exclusive options
7149        if set_options_count > 0 && drop_options_count > 0 {
7150            sql_bail!(
7151                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7152                connection_type,
7153                mutually_exclusive_options
7154                    .iter()
7155                    .map(|option| option.to_string())
7156                    .join(", ")
7157            )
7158        }
7159
7160        // If any option is either set or dropped, ensure all mutually exclusive
7161        // options are dropped. We do this "behind the scenes", even though we
7162        // disallow users from performing the same action because this is the
7163        // mechanism by which we overwrite values elsewhere in the code.
7164        if set_options_count > 0 || drop_options_count > 0 {
7165            drop_options.extend(mutually_exclusive_options.iter().cloned());
7166        }
7167
7168        // n.b. if mutually exclusive options are set, those will error when we
7169        // try to replan the connection.
7170    }
7171
7172    Ok(Plan::AlterConnection(AlterConnectionPlan {
7173        id: entry.id(),
7174        action: crate::plan::AlterConnectionAction::AlterOptions {
7175            set_options,
7176            drop_options,
7177            validate,
7178        },
7179    }))
7180}
7181
7182pub fn describe_alter_sink(
7183    _: &StatementContext,
7184    _: AlterSinkStatement<Aug>,
7185) -> Result<StatementDesc, PlanError> {
7186    Ok(StatementDesc::new(None))
7187}
7188
7189pub fn plan_alter_sink(
7190    scx: &mut StatementContext,
7191    stmt: AlterSinkStatement<Aug>,
7192) -> Result<Plan, PlanError> {
7193    let AlterSinkStatement {
7194        sink_name,
7195        if_exists,
7196        action,
7197    } = stmt;
7198
7199    let object_type = ObjectType::Sink;
7200    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7201
7202    let Some(item) = item else {
7203        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7204            name: sink_name.to_string(),
7205            object_type,
7206        });
7207
7208        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7209    };
7210    // Always ALTER objects from their latest version.
7211    let item = item.at_version(RelationVersionSelector::Latest);
7212
7213    match action {
7214        AlterSinkAction::ChangeRelation(new_from) => {
7215            // First we reconstruct the original CREATE SINK statement
7216            let create_sql = item.create_sql();
7217            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7218            let [stmt]: [StatementParseResult; 1] = stmts
7219                .try_into()
7220                .expect("create sql of sink was not exactly one statement");
7221            let Statement::CreateSink(stmt) = stmt.ast else {
7222                unreachable!("invalid create SQL for sink item");
7223            };
7224
7225            // Then resolve and swap the resolved from relation to the new one
7226            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7227            stmt.from = new_from;
7228
7229            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7230            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7231                unreachable!("invalid plan for CREATE SINK statement");
7232            };
7233
7234            plan.sink.version += 1;
7235
7236            Ok(Plan::AlterSink(AlterSinkPlan {
7237                item_id: item.id(),
7238                global_id: item.global_id(),
7239                sink: plan.sink,
7240                with_snapshot: plan.with_snapshot,
7241                in_cluster: plan.in_cluster,
7242            }))
7243        }
7244        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7245        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7246    }
7247}
7248
7249pub fn describe_alter_source(
7250    _: &StatementContext,
7251    _: AlterSourceStatement<Aug>,
7252) -> Result<StatementDesc, PlanError> {
7253    // TODO: put the options here, right?
7254    Ok(StatementDesc::new(None))
7255}
7256
7257generate_extracted_config!(
7258    AlterSourceAddSubsourceOption,
7259    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7260    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7261    (Details, String)
7262);
7263
7264pub fn plan_alter_source(
7265    scx: &mut StatementContext,
7266    stmt: AlterSourceStatement<Aug>,
7267) -> Result<Plan, PlanError> {
7268    let AlterSourceStatement {
7269        source_name,
7270        if_exists,
7271        action,
7272    } = stmt;
7273    let object_type = ObjectType::Source;
7274
7275    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7276        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7277            name: source_name.to_string(),
7278            object_type,
7279        });
7280
7281        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7282    }
7283
7284    match action {
7285        AlterSourceAction::SetOptions(options) => {
7286            let mut options = options.into_iter();
7287            let option = options.next().unwrap();
7288            if option.name == CreateSourceOptionName::RetainHistory {
7289                if options.next().is_some() {
7290                    sql_bail!("RETAIN HISTORY must be only option");
7291                }
7292                return alter_retain_history(
7293                    scx,
7294                    object_type,
7295                    if_exists,
7296                    UnresolvedObjectName::Item(source_name),
7297                    option.value,
7298                );
7299            }
7300            // n.b we use this statement in purification in a way that cannot be
7301            // planned directly.
7302            sql_bail!(
7303                "Cannot modify the {} of a SOURCE.",
7304                option.name.to_ast_string_simple()
7305            );
7306        }
7307        AlterSourceAction::ResetOptions(reset) => {
7308            let mut options = reset.into_iter();
7309            let option = options.next().unwrap();
7310            if option == CreateSourceOptionName::RetainHistory {
7311                if options.next().is_some() {
7312                    sql_bail!("RETAIN HISTORY must be only option");
7313                }
7314                return alter_retain_history(
7315                    scx,
7316                    object_type,
7317                    if_exists,
7318                    UnresolvedObjectName::Item(source_name),
7319                    None,
7320                );
7321            }
7322            sql_bail!(
7323                "Cannot modify the {} of a SOURCE.",
7324                option.to_ast_string_simple()
7325            );
7326        }
7327        AlterSourceAction::DropSubsources { .. } => {
7328            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7329        }
7330        AlterSourceAction::AddSubsources { .. } => {
7331            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7332        }
7333        AlterSourceAction::RefreshReferences => {
7334            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7335        }
7336    };
7337}
7338
7339pub fn describe_alter_system_set(
7340    _: &StatementContext,
7341    _: AlterSystemSetStatement,
7342) -> Result<StatementDesc, PlanError> {
7343    Ok(StatementDesc::new(None))
7344}
7345
7346pub fn plan_alter_system_set(
7347    _: &StatementContext,
7348    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7349) -> Result<Plan, PlanError> {
7350    let name = name.to_string();
7351    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7352        name,
7353        value: scl::plan_set_variable_to(to)?,
7354    }))
7355}
7356
7357pub fn describe_alter_system_reset(
7358    _: &StatementContext,
7359    _: AlterSystemResetStatement,
7360) -> Result<StatementDesc, PlanError> {
7361    Ok(StatementDesc::new(None))
7362}
7363
7364pub fn plan_alter_system_reset(
7365    _: &StatementContext,
7366    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7367) -> Result<Plan, PlanError> {
7368    let name = name.to_string();
7369    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7370}
7371
7372pub fn describe_alter_system_reset_all(
7373    _: &StatementContext,
7374    _: AlterSystemResetAllStatement,
7375) -> Result<StatementDesc, PlanError> {
7376    Ok(StatementDesc::new(None))
7377}
7378
7379pub fn plan_alter_system_reset_all(
7380    _: &StatementContext,
7381    _: AlterSystemResetAllStatement,
7382) -> Result<Plan, PlanError> {
7383    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7384}
7385
7386pub fn describe_alter_role(
7387    _: &StatementContext,
7388    _: AlterRoleStatement<Aug>,
7389) -> Result<StatementDesc, PlanError> {
7390    Ok(StatementDesc::new(None))
7391}
7392
7393pub fn plan_alter_role(
7394    scx: &StatementContext,
7395    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7396) -> Result<Plan, PlanError> {
7397    let option = match option {
7398        AlterRoleOption::Attributes(attrs) => {
7399            let attrs = plan_role_attributes(attrs, scx)?;
7400            PlannedAlterRoleOption::Attributes(attrs)
7401        }
7402        AlterRoleOption::Variable(variable) => {
7403            let var = plan_role_variable(variable)?;
7404            PlannedAlterRoleOption::Variable(var)
7405        }
7406    };
7407
7408    Ok(Plan::AlterRole(AlterRolePlan {
7409        id: name.id,
7410        name: name.name,
7411        option,
7412    }))
7413}
7414
7415pub fn describe_alter_table_add_column(
7416    _: &StatementContext,
7417    _: AlterTableAddColumnStatement<Aug>,
7418) -> Result<StatementDesc, PlanError> {
7419    Ok(StatementDesc::new(None))
7420}
7421
7422pub fn plan_alter_table_add_column(
7423    scx: &StatementContext,
7424    stmt: AlterTableAddColumnStatement<Aug>,
7425) -> Result<Plan, PlanError> {
7426    let AlterTableAddColumnStatement {
7427        if_exists,
7428        name,
7429        if_col_not_exist,
7430        column_name,
7431        data_type,
7432    } = stmt;
7433    let object_type = ObjectType::Table;
7434
7435    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7436
7437    let (relation_id, item_name, desc) =
7438        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7439            Some(item) => {
7440                // Always add columns to the latest version of the item.
7441                let item_name = scx.catalog.resolve_full_name(item.name());
7442                let item = item.at_version(RelationVersionSelector::Latest);
7443                let desc = item.relation_desc().expect("table has desc").into_owned();
7444                (item.id(), item_name, desc)
7445            }
7446            None => {
7447                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7448                    name: name.to_ast_string_simple(),
7449                    object_type,
7450                });
7451                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7452            }
7453        };
7454
7455    let column_name = ColumnName::from(column_name.as_str());
7456    if desc.get_by_name(&column_name).is_some() {
7457        if if_col_not_exist {
7458            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7459                column_name: column_name.to_string(),
7460                object_name: item_name.item,
7461            });
7462            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7463        } else {
7464            return Err(PlanError::ColumnAlreadyExists {
7465                column_name,
7466                object_name: item_name.item,
7467            });
7468        }
7469    }
7470
7471    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7472    // TODO(alter_table): Support non-nullable columns with default values.
7473    let column_type = scalar_type.nullable(true);
7474    // "unresolve" our data type so we can later update the persisted create_sql.
7475    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7476
7477    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7478        relation_id,
7479        column_name,
7480        column_type,
7481        raw_sql_type,
7482    }))
7483}
7484
7485pub fn describe_alter_materialized_view_apply_replacement(
7486    _: &StatementContext,
7487    _: AlterMaterializedViewApplyReplacementStatement,
7488) -> Result<StatementDesc, PlanError> {
7489    Ok(StatementDesc::new(None))
7490}
7491
7492pub fn plan_alter_materialized_view_apply_replacement(
7493    scx: &StatementContext,
7494    stmt: AlterMaterializedViewApplyReplacementStatement,
7495) -> Result<Plan, PlanError> {
7496    let AlterMaterializedViewApplyReplacementStatement {
7497        if_exists,
7498        name,
7499        replacement_name,
7500    } = stmt;
7501
7502    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7503
7504    let object_type = ObjectType::MaterializedView;
7505    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7506        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7507            name: name.to_ast_string_simple(),
7508            object_type,
7509        });
7510        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7511    };
7512
7513    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7514        .expect("if_exists not set");
7515
7516    if replacement.replacement_target() != Some(mv.id()) {
7517        return Err(PlanError::InvalidReplacement {
7518            item_type: mv.item_type(),
7519            item_name: scx.catalog.minimal_qualification(mv.name()),
7520            replacement_type: replacement.item_type(),
7521            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7522        });
7523    }
7524
7525    Ok(Plan::AlterMaterializedViewApplyReplacement(
7526        AlterMaterializedViewApplyReplacementPlan {
7527            id: mv.id(),
7528            replacement_id: replacement.id(),
7529        },
7530    ))
7531}
7532
7533pub fn describe_comment(
7534    _: &StatementContext,
7535    _: CommentStatement<Aug>,
7536) -> Result<StatementDesc, PlanError> {
7537    Ok(StatementDesc::new(None))
7538}
7539
7540pub fn plan_comment(
7541    scx: &mut StatementContext,
7542    stmt: CommentStatement<Aug>,
7543) -> Result<Plan, PlanError> {
7544    const MAX_COMMENT_LENGTH: usize = 1024;
7545
7546    let CommentStatement { object, comment } = stmt;
7547
7548    // TODO(parkmycar): Make max comment length configurable.
7549    if let Some(c) = &comment {
7550        if c.len() > 1024 {
7551            return Err(PlanError::CommentTooLong {
7552                length: c.len(),
7553                max_size: MAX_COMMENT_LENGTH,
7554            });
7555        }
7556    }
7557
7558    let (object_id, column_pos) = match &object {
7559        com_ty @ CommentObjectType::Table { name }
7560        | com_ty @ CommentObjectType::View { name }
7561        | com_ty @ CommentObjectType::MaterializedView { name }
7562        | com_ty @ CommentObjectType::Index { name }
7563        | com_ty @ CommentObjectType::Func { name }
7564        | com_ty @ CommentObjectType::Connection { name }
7565        | com_ty @ CommentObjectType::Source { name }
7566        | com_ty @ CommentObjectType::Sink { name }
7567        | com_ty @ CommentObjectType::Secret { name }
7568        | com_ty @ CommentObjectType::ContinualTask { name } => {
7569            let item = scx.get_item_by_resolved_name(name)?;
7570            match (com_ty, item.item_type()) {
7571                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7572                    (CommentObjectId::Table(item.id()), None)
7573                }
7574                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7575                    (CommentObjectId::View(item.id()), None)
7576                }
7577                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7578                    (CommentObjectId::MaterializedView(item.id()), None)
7579                }
7580                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7581                    (CommentObjectId::Index(item.id()), None)
7582                }
7583                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7584                    (CommentObjectId::Func(item.id()), None)
7585                }
7586                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7587                    (CommentObjectId::Connection(item.id()), None)
7588                }
7589                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7590                    (CommentObjectId::Source(item.id()), None)
7591                }
7592                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7593                    (CommentObjectId::Sink(item.id()), None)
7594                }
7595                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7596                    (CommentObjectId::Secret(item.id()), None)
7597                }
7598                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7599                    (CommentObjectId::ContinualTask(item.id()), None)
7600                }
7601                (com_ty, cat_ty) => {
7602                    let expected_type = match com_ty {
7603                        CommentObjectType::Table { .. } => ObjectType::Table,
7604                        CommentObjectType::View { .. } => ObjectType::View,
7605                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7606                        CommentObjectType::Index { .. } => ObjectType::Index,
7607                        CommentObjectType::Func { .. } => ObjectType::Func,
7608                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7609                        CommentObjectType::Source { .. } => ObjectType::Source,
7610                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7611                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7612                        _ => unreachable!("these are the only types we match on"),
7613                    };
7614
7615                    return Err(PlanError::InvalidObjectType {
7616                        expected_type: SystemObjectType::Object(expected_type),
7617                        actual_type: SystemObjectType::Object(cat_ty.into()),
7618                        object_name: item.name().item.clone(),
7619                    });
7620                }
7621            }
7622        }
7623        CommentObjectType::Type { ty } => match ty {
7624            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7625                sql_bail!("cannot comment on anonymous list or map type");
7626            }
7627            ResolvedDataType::Named { id, modifiers, .. } => {
7628                if !modifiers.is_empty() {
7629                    sql_bail!("cannot comment on type with modifiers");
7630                }
7631                (CommentObjectId::Type(*id), None)
7632            }
7633            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7634        },
7635        CommentObjectType::Column { name } => {
7636            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7637            match item.item_type() {
7638                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7639                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7640                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7641                CatalogItemType::MaterializedView => {
7642                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7643                }
7644                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7645                r => {
7646                    return Err(PlanError::Unsupported {
7647                        feature: format!("Specifying comments on a column of {r}"),
7648                        discussion_no: None,
7649                    });
7650                }
7651            }
7652        }
7653        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7654        CommentObjectType::Database { name } => {
7655            (CommentObjectId::Database(*name.database_id()), None)
7656        }
7657        CommentObjectType::Schema { name } => {
7658            // Temporary schemas cannot have comments - they are connection-specific
7659            // and transient. With lazy temporary schema creation, the temp schema
7660            // may not exist yet, but we still need to return the correct error.
7661            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7662                sql_bail!(
7663                    "cannot comment on schema {} because it is a temporary schema",
7664                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7665                );
7666            }
7667            (
7668                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7669                None,
7670            )
7671        }
7672        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7673        CommentObjectType::ClusterReplica { name } => {
7674            let replica = scx.catalog.resolve_cluster_replica(name)?;
7675            (
7676                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7677                None,
7678            )
7679        }
7680        CommentObjectType::NetworkPolicy { name } => {
7681            (CommentObjectId::NetworkPolicy(name.id), None)
7682        }
7683    };
7684
7685    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7686    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7687    // it's the easiest place to raise an error.
7688    //
7689    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7690    if let Some(p) = column_pos {
7691        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7692            max_num_columns: MAX_NUM_COLUMNS,
7693            req_num_columns: p,
7694        })?;
7695    }
7696
7697    Ok(Plan::Comment(CommentPlan {
7698        object_id,
7699        sub_component: column_pos,
7700        comment,
7701    }))
7702}
7703
7704pub(crate) fn resolve_cluster<'a>(
7705    scx: &'a StatementContext,
7706    name: &'a Ident,
7707    if_exists: bool,
7708) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7709    match scx.resolve_cluster(Some(name)) {
7710        Ok(cluster) => Ok(Some(cluster)),
7711        Err(_) if if_exists => Ok(None),
7712        Err(e) => Err(e),
7713    }
7714}
7715
7716pub(crate) fn resolve_cluster_replica<'a>(
7717    scx: &'a StatementContext,
7718    name: &QualifiedReplica,
7719    if_exists: bool,
7720) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7721    match scx.resolve_cluster(Some(&name.cluster)) {
7722        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7723            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7724            None if if_exists => Ok(None),
7725            None => Err(sql_err!(
7726                "CLUSTER {} has no CLUSTER REPLICA named {}",
7727                cluster.name(),
7728                name.replica.as_str().quoted(),
7729            )),
7730        },
7731        Err(_) if if_exists => Ok(None),
7732        Err(e) => Err(e),
7733    }
7734}
7735
7736pub(crate) fn resolve_database<'a>(
7737    scx: &'a StatementContext,
7738    name: &'a UnresolvedDatabaseName,
7739    if_exists: bool,
7740) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7741    match scx.resolve_database(name) {
7742        Ok(database) => Ok(Some(database)),
7743        Err(_) if if_exists => Ok(None),
7744        Err(e) => Err(e),
7745    }
7746}
7747
7748pub(crate) fn resolve_schema<'a>(
7749    scx: &'a StatementContext,
7750    name: UnresolvedSchemaName,
7751    if_exists: bool,
7752) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7753    match scx.resolve_schema(name) {
7754        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7755        Err(_) if if_exists => Ok(None),
7756        Err(e) => Err(e),
7757    }
7758}
7759
7760pub(crate) fn resolve_network_policy<'a>(
7761    scx: &'a StatementContext,
7762    name: Ident,
7763    if_exists: bool,
7764) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7765    match scx.catalog.resolve_network_policy(&name.to_string()) {
7766        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7767            id: policy.id(),
7768            name: policy.name().to_string(),
7769        })),
7770        Err(_) if if_exists => Ok(None),
7771        Err(e) => Err(e.into()),
7772    }
7773}
7774
7775pub(crate) fn resolve_item_or_type<'a>(
7776    scx: &'a StatementContext,
7777    object_type: ObjectType,
7778    name: UnresolvedItemName,
7779    if_exists: bool,
7780) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7781    let name = normalize::unresolved_item_name(name)?;
7782    let catalog_item = match object_type {
7783        ObjectType::Type => scx.catalog.resolve_type(&name),
7784        _ => scx.catalog.resolve_item(&name),
7785    };
7786
7787    match catalog_item {
7788        Ok(item) => {
7789            let is_type = ObjectType::from(item.item_type());
7790            if object_type == is_type {
7791                Ok(Some(item))
7792            } else {
7793                Err(PlanError::MismatchedObjectType {
7794                    name: scx.catalog.minimal_qualification(item.name()),
7795                    is_type,
7796                    expected_type: object_type,
7797                })
7798            }
7799        }
7800        Err(_) if if_exists => Ok(None),
7801        Err(e) => Err(e.into()),
7802    }
7803}
7804
7805/// Returns an error if the given cluster is a managed cluster
7806fn ensure_cluster_is_not_managed(
7807    scx: &StatementContext,
7808    cluster_id: ClusterId,
7809) -> Result<(), PlanError> {
7810    let cluster = scx.catalog.get_cluster(cluster_id);
7811    if cluster.is_managed() {
7812        Err(PlanError::ManagedCluster {
7813            cluster_name: cluster.name().to_string(),
7814        })
7815    } else {
7816        Ok(())
7817    }
7818}