mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59    CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60    CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61    CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141    scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154    AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155    ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156    ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157    CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158    CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165    WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184/// Given a relation desc and a column list, checks that:
185/// - the column list is a prefix of the desc;
186/// - all the listed columns are types that have meaningful Persist-level ordering.
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188    if partition_by.len() > desc.len() {
189        tracing::error!(
190            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191            desc.len(),
192            partition_by.len()
193        );
194        partition_by.truncate(desc.len());
195    }
196
197    let desc_prefix = desc.iter().take(partition_by.len());
198    for (idx, ((desc_name, desc_type), partition_name)) in
199        desc_prefix.zip_eq(partition_by).enumerate()
200    {
201        let partition_name = normalize::column_name(partition_name);
202        if *desc_name != partition_name {
203            sql_bail!(
204                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205            );
206        }
207        if !preserves_order(&desc_type.scalar_type) {
208            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209        }
210    }
211    Ok(())
212}
213
214pub fn describe_create_database(
215    _: &StatementContext,
216    _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218    Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222    _: &StatementContext,
223    CreateDatabaseStatement {
224        name,
225        if_not_exists,
226    }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228    Ok(Plan::CreateDatabase(CreateDatabasePlan {
229        name: normalize::ident(name.0),
230        if_not_exists,
231    }))
232}
233
234pub fn describe_create_schema(
235    _: &StatementContext,
236    _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238    Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242    scx: &StatementContext,
243    CreateSchemaStatement {
244        mut name,
245        if_not_exists,
246    }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248    if name.0.len() > 2 {
249        sql_bail!("schema name {} has more than two components", name);
250    }
251    let schema_name = normalize::ident(
252        name.0
253            .pop()
254            .expect("names always have at least one component"),
255    );
256    let database_spec = match name.0.pop() {
257        None => match scx.catalog.active_database() {
258            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259            None => sql_bail!("no database specified and no active database"),
260        },
261        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263            Err(_) => sql_bail!("invalid database {}", n.as_str()),
264        },
265    };
266    Ok(Plan::CreateSchema(CreateSchemaPlan {
267        database_spec,
268        schema_name,
269        if_not_exists,
270    }))
271}
272
273pub fn describe_create_table(
274    _: &StatementContext,
275    _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277    Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281    scx: &StatementContext,
282    stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284    let CreateTableStatement {
285        name,
286        columns,
287        constraints,
288        if_not_exists,
289        temporary,
290        with_options,
291    } = &stmt;
292
293    let names: Vec<_> = columns
294        .iter()
295        .filter(|c| {
296            // This set of `names` is used to create the initial RelationDesc.
297            // Columns that have been added at later versions of the table will
298            // get added further below.
299            let is_versioned = c
300                .options
301                .iter()
302                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303            !is_versioned
304        })
305        .map(|c| normalize::column_name(c.name.clone()))
306        .collect();
307
308    if let Some(dup) = names.iter().duplicates().next() {
309        sql_bail!("column {} specified more than once", dup.quoted());
310    }
311
312    // Build initial relation type that handles declared data types
313    // and NOT NULL constraints.
314    let mut column_types = Vec::with_capacity(columns.len());
315    let mut defaults = Vec::with_capacity(columns.len());
316    let mut changes = BTreeMap::new();
317    let mut keys = Vec::new();
318
319    for (i, c) in columns.into_iter().enumerate() {
320        let aug_data_type = &c.data_type;
321        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322        let mut nullable = true;
323        let mut default = Expr::null();
324        let mut versioned = false;
325        for option in &c.options {
326            match &option.option {
327                ColumnOption::NotNull => nullable = false,
328                ColumnOption::Default(expr) => {
329                    // Ensure expression can be planned and yields the correct
330                    // type.
331                    let mut expr = expr.clone();
332                    transform_ast::transform(scx, &mut expr)?;
333                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
334                    default = expr.clone();
335                }
336                ColumnOption::Unique { is_primary } => {
337                    keys.push(vec![i]);
338                    if *is_primary {
339                        nullable = false;
340                    }
341                }
342                ColumnOption::Versioned { action, version } => {
343                    let version = RelationVersion::from(*version);
344                    versioned = true;
345
346                    let name = normalize::column_name(c.name.clone());
347                    let typ = ty.clone().nullable(nullable);
348
349                    changes.insert(version, (action.clone(), name, typ));
350                }
351                other => {
352                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353                }
354            }
355        }
356        // TODO(alter_table): This assumes all versioned columns are at the
357        // end. This will no longer be true when we support dropping columns.
358        if !versioned {
359            column_types.push(ty.nullable(nullable));
360        }
361        defaults.push(default);
362    }
363
364    let mut seen_primary = false;
365    'c: for constraint in constraints {
366        match constraint {
367            TableConstraint::Unique {
368                name: _,
369                columns,
370                is_primary,
371                nulls_not_distinct,
372            } => {
373                if seen_primary && *is_primary {
374                    sql_bail!(
375                        "multiple primary keys for table {} are not allowed",
376                        name.to_ast_string_stable()
377                    );
378                }
379                seen_primary = *is_primary || seen_primary;
380
381                let mut key = vec![];
382                for column in columns {
383                    let column = normalize::column_name(column.clone());
384                    match names.iter().position(|name| *name == column) {
385                        None => sql_bail!("unknown column in constraint: {}", column),
386                        Some(i) => {
387                            let nullable = &mut column_types[i].nullable;
388                            if *is_primary {
389                                if *nulls_not_distinct {
390                                    sql_bail!(
391                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392                                    );
393                                }
394
395                                *nullable = false;
396                            } else if !(*nulls_not_distinct || !*nullable) {
397                                // Non-primary key unique constraints are only keys if all of their
398                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
399                                break 'c;
400                            }
401
402                            key.push(i);
403                        }
404                    }
405                }
406
407                if *is_primary {
408                    keys.insert(0, key);
409                } else {
410                    keys.push(key);
411                }
412            }
413            TableConstraint::ForeignKey { .. } => {
414                // Foreign key constraints are not presently enforced. We allow
415                // them with feature flags for sqllogictest's sake.
416                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417            }
418            TableConstraint::Check { .. } => {
419                // Check constraints are not presently enforced. We allow them
420                // with feature flags for sqllogictest's sake.
421                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422            }
423        }
424    }
425
426    if !keys.is_empty() {
427        // Unique constraints are not presently enforced. We allow them with feature flags for
428        // sqllogictest's sake.
429        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430    }
431
432    let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434    let temporary = *temporary;
435    let name = if temporary {
436        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437    } else {
438        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    };
440
441    // Check for an object in the catalog with this same name
442    let full_name = scx.catalog.resolve_full_name(&name);
443    let partial_name = PartialItemName::from(full_name.clone());
444    // For PostgreSQL compatibility, we need to prevent creating tables when
445    // there is an existing object *or* type of the same name.
446    if let (false, Ok(item)) = (
447        if_not_exists,
448        scx.catalog.resolve_item_or_type(&partial_name),
449    ) {
450        return Err(PlanError::ItemAlreadyExists {
451            name: full_name.to_string(),
452            item_type: item.item_type(),
453        });
454    }
455
456    let desc = RelationDesc::new(typ, names);
457    let mut desc = VersionedRelationDesc::new(desc);
458    for (version, (_action, name, typ)) in changes.into_iter() {
459        let new_version = desc.add_column(name, typ);
460        if version != new_version {
461            return Err(PlanError::InvalidTable {
462                name: full_name.item,
463            });
464        }
465    }
466
467    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469    // Table options should only consider the original columns, since those
470    // were the only ones in scope when the table was created.
471    //
472    // TODO(alter_table): Will need to reconsider this when we support ALTERing
473    // the PARTITION BY columns.
474    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477    let compaction_window = options.iter().find_map(|o| {
478        #[allow(irrefutable_let_patterns)]
479        if let crate::plan::TableOption::RetainHistory(lcw) = o {
480            Some(lcw.clone())
481        } else {
482            None
483        }
484    });
485
486    let table = Table {
487        create_sql,
488        desc,
489        temporary,
490        compaction_window,
491        data_source: TableDataSource::TableWrites { defaults },
492    };
493    Ok(Plan::CreateTable(CreateTablePlan {
494        name,
495        table,
496        if_not_exists: *if_not_exists,
497    }))
498}
499
500pub fn describe_create_table_from_source(
501    _: &StatementContext,
502    _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504    Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508    _: &StatementContext,
509    _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511    Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515    _: &StatementContext,
516    _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518    Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522    _: &StatementContext,
523    _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525    Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529    CreateSourceOption,
530    (TimestampInterval, Duration),
531    (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535    PgConfigOption,
536    (Details, String),
537    (Publication, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543    MySqlConfigOption,
544    (Details, String),
545    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550    SqlServerConfigOption,
551    (Details, String),
552    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557    scx: &StatementContext,
558    mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560    if stmt.is_table {
561        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562    }
563
564    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
565    // we plan to normalize when we canonicalize the create statement.
566    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567    let enable_multi_replica_sources =
568        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569    if !enable_multi_replica_sources {
570        if in_cluster.replica_ids().len() > 1 {
571            sql_bail!("cannot create webhook source in cluster with more than one replica")
572        }
573    }
574    let create_sql =
575        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577    let CreateWebhookSourceStatement {
578        name,
579        if_not_exists,
580        body_format,
581        include_headers,
582        validate_using,
583        is_table,
584        // We resolved `in_cluster` above, so we want to ignore it here.
585        in_cluster: _,
586    } = stmt;
587
588    let validate_using = validate_using
589        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590        .transpose()?;
591    if let Some(WebhookValidation { expression, .. }) = &validate_using {
592        // If the validation expression doesn't reference any part of the request, then we should
593        // return an error because it's almost definitely wrong.
594        if !expression.contains_column() {
595            return Err(PlanError::WebhookValidationDoesNotUseColumns);
596        }
597        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
598        // allow calls to `now()` because some webhook providers recommend rejecting requests that
599        // are older than a certain threshold.
600        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601            return Err(PlanError::WebhookValidationNonDeterministic);
602        }
603    }
604
605    let body_format = match body_format {
606        Format::Bytes => WebhookBodyFormat::Bytes,
607        Format::Json { array } => WebhookBodyFormat::Json { array },
608        Format::Text => WebhookBodyFormat::Text,
609        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
610        ty => {
611            return Err(PlanError::Unsupported {
612                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613                discussion_no: None,
614            });
615        }
616    };
617
618    let mut column_ty = vec![
619        // Always include the body of the request as the first column.
620        SqlColumnType {
621            scalar_type: SqlScalarType::from(body_format),
622            nullable: false,
623        },
624    ];
625    let mut column_names = vec!["body".to_string()];
626
627    let mut headers = WebhookHeaders::default();
628
629    // Include a `headers` column, possibly filtered.
630    if let Some(filters) = include_headers.column {
631        column_ty.push(SqlColumnType {
632            scalar_type: SqlScalarType::Map {
633                value_type: Box::new(SqlScalarType::String),
634                custom_id: None,
635            },
636            nullable: false,
637        });
638        column_names.push("headers".to_string());
639
640        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641            filters.into_iter().partition_map(|filter| {
642                if filter.block {
643                    itertools::Either::Right(filter.header_name)
644                } else {
645                    itertools::Either::Left(filter.header_name)
646                }
647            });
648        headers.header_column = Some(WebhookHeaderFilters { allow, block });
649    }
650
651    // Map headers to specific columns.
652    for header in include_headers.mappings {
653        let scalar_type = header
654            .use_bytes
655            .then_some(SqlScalarType::Bytes)
656            .unwrap_or(SqlScalarType::String);
657        column_ty.push(SqlColumnType {
658            scalar_type,
659            nullable: true,
660        });
661        column_names.push(header.column_name.into_string());
662
663        let column_idx = column_ty.len() - 1;
664        // Double check we're consistent with column names.
665        assert_eq!(
666            column_idx,
667            column_names.len() - 1,
668            "header column names and types don't match"
669        );
670        headers
671            .mapped_headers
672            .insert(column_idx, (header.header_name, header.use_bytes));
673    }
674
675    // Validate our columns.
676    let mut unique_check = HashSet::with_capacity(column_names.len());
677    for name in &column_names {
678        if !unique_check.insert(name) {
679            return Err(PlanError::AmbiguousColumn(name.clone().into()));
680        }
681    }
682    if column_names.len() > MAX_NUM_COLUMNS {
683        return Err(PlanError::TooManyColumns {
684            max_num_columns: MAX_NUM_COLUMNS,
685            req_num_columns: column_names.len(),
686        });
687    }
688
689    let typ = SqlRelationType::new(column_ty);
690    let desc = RelationDesc::new(typ, column_names);
691
692    // Check for an object in the catalog with this same name
693    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694    let full_name = scx.catalog.resolve_full_name(&name);
695    let partial_name = PartialItemName::from(full_name.clone());
696    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697        return Err(PlanError::ItemAlreadyExists {
698            name: full_name.to_string(),
699            item_type: item.item_type(),
700        });
701    }
702
703    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
704    // such, we always use a default of EpochMilliseconds.
705    let timeline = Timeline::EpochMilliseconds;
706
707    let plan = if is_table {
708        let data_source = DataSourceDesc::Webhook {
709            validate_using,
710            body_format,
711            headers,
712            cluster_id: Some(in_cluster.id()),
713        };
714        let data_source = TableDataSource::DataSource {
715            desc: data_source,
716            timeline,
717        };
718        Plan::CreateTable(CreateTablePlan {
719            name,
720            if_not_exists,
721            table: Table {
722                create_sql,
723                desc: VersionedRelationDesc::new(desc),
724                temporary: false,
725                compaction_window: None,
726                data_source,
727            },
728        })
729    } else {
730        let data_source = DataSourceDesc::Webhook {
731            validate_using,
732            body_format,
733            headers,
734            // Important: The cluster is set at the `Source` level.
735            cluster_id: None,
736        };
737        Plan::CreateSource(CreateSourcePlan {
738            name,
739            source: Source {
740                create_sql,
741                data_source,
742                desc,
743                compaction_window: None,
744            },
745            if_not_exists,
746            timeline,
747            in_cluster: Some(in_cluster.id()),
748        })
749    };
750
751    Ok(plan)
752}
753
754pub fn plan_create_source(
755    scx: &StatementContext,
756    mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758    let CreateSourceStatement {
759        name,
760        in_cluster: _,
761        col_names,
762        connection: source_connection,
763        envelope,
764        if_not_exists,
765        format,
766        key_constraint,
767        include_metadata,
768        with_options,
769        external_references: referenced_subsources,
770        progress_subsource,
771    } = &stmt;
772
773    mz_ore::soft_assert_or_log!(
774        referenced_subsources.is_none(),
775        "referenced subsources must be cleared in purification"
776    );
777
778    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779        && scx.catalog.system_vars().force_source_table_syntax();
780
781    // If the new source table syntax is forced all the options related to the primary
782    // source output should be un-set.
783    if force_source_table_syntax {
784        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785            Err(PlanError::UseTablesForSources(
786                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787            ))?;
788        }
789    }
790
791    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794        && include_metadata
795            .iter()
796            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797    {
798        // TODO(guswynn): should this be `bail_unsupported!`?
799        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800    }
801    if !matches!(
802        source_connection,
803        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804    ) && !include_metadata.is_empty()
805    {
806        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807    }
808
809    if !include_metadata.is_empty()
810        && !matches!(
811            envelope,
812            ast::SourceEnvelope::Upsert { .. }
813                | ast::SourceEnvelope::None
814                | ast::SourceEnvelope::Debezium
815        )
816    {
817        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818    }
819
820    let external_connection =
821        plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823    let CreateSourceOptionExtracted {
824        timestamp_interval,
825        retain_history,
826        seen: _,
827    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829    let metadata_columns_desc = match external_connection {
830        GenericSourceConnection::Kafka(KafkaSourceConnection {
831            ref metadata_columns,
832            ..
833        }) => kafka_metadata_columns_desc(metadata_columns),
834        _ => vec![],
835    };
836
837    // Generate the relation description for the primary export of the source.
838    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839        scx,
840        &envelope,
841        format,
842        Some(external_connection.default_key_desc()),
843        external_connection.default_value_desc(),
844        include_metadata,
845        metadata_columns_desc,
846        &external_connection,
847    )?;
848    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850    let names: Vec<_> = desc.iter_names().cloned().collect();
851    if let Some(dup) = names.iter().duplicates().next() {
852        sql_bail!("column {} specified more than once", dup.quoted());
853    }
854
855    // Apply user-specified key constraint
856    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857        // Don't remove this without addressing
858        // https://github.com/MaterializeInc/database-issues/issues/4371.
859        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861        let key_columns = columns
862            .into_iter()
863            .map(normalize::column_name)
864            .collect::<Vec<_>>();
865
866        let mut uniq = BTreeSet::new();
867        for col in key_columns.iter() {
868            if !uniq.insert(col) {
869                sql_bail!("Repeated column name in source key constraint: {}", col);
870            }
871        }
872
873        let key_indices = key_columns
874            .iter()
875            .map(|col| {
876                let name_idx = desc
877                    .get_by_name(col)
878                    .map(|(idx, _type)| idx)
879                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880                if desc.get_unambiguous_name(name_idx).is_none() {
881                    sql_bail!("Ambiguous column in source key constraint: {}", col);
882                }
883                Ok(name_idx)
884            })
885            .collect::<Result<Vec<_>, _>>()?;
886
887        if !desc.typ().keys.is_empty() {
888            return Err(key_constraint_err(&desc, &key_columns));
889        } else {
890            desc = desc.with_key(key_indices);
891        }
892    }
893
894    let timestamp_interval = match timestamp_interval {
895        Some(duration) => {
896            let min = scx.catalog.system_vars().min_timestamp_interval();
897            let max = scx.catalog.system_vars().max_timestamp_interval();
898            if duration < min || duration > max {
899                return Err(PlanError::InvalidTimestampInterval {
900                    min,
901                    max,
902                    requested: duration,
903                });
904            }
905            duration
906        }
907        None => scx.catalog.config().timestamp_interval,
908    };
909
910    let (desc, data_source) = match progress_subsource {
911        Some(name) => {
912            let DeferredItemName::Named(name) = name else {
913                sql_bail!("[internal error] progress subsource must be named during purification");
914            };
915            let ResolvedItemName::Item { id, .. } = name else {
916                sql_bail!("[internal error] invalid target id");
917            };
918
919            let details = match external_connection {
920                GenericSourceConnection::Kafka(ref c) => {
921                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
922                        metadata_columns: c.metadata_columns.clone(),
923                    })
924                }
925                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926                    LoadGenerator::Auction
927                    | LoadGenerator::Marketing
928                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929                    LoadGenerator::Counter { .. }
930                    | LoadGenerator::Clock
931                    | LoadGenerator::Datums
932                    | LoadGenerator::KeyValue(_) => {
933                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934                            output: LoadGeneratorOutput::Default,
935                        })
936                    }
937                },
938                GenericSourceConnection::Postgres(_)
939                | GenericSourceConnection::MySql(_)
940                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941            };
942
943            let data_source = DataSourceDesc::OldSyntaxIngestion {
944                desc: SourceDesc {
945                    connection: external_connection,
946                    timestamp_interval,
947                },
948                progress_subsource: *id,
949                data_config: SourceExportDataConfig {
950                    encoding,
951                    envelope: envelope.clone(),
952                },
953                details,
954            };
955            (desc, data_source)
956        }
957        None => {
958            let desc = external_connection.timestamp_desc();
959            let data_source = DataSourceDesc::Ingestion(SourceDesc {
960                connection: external_connection,
961                timestamp_interval,
962            });
963            (desc, data_source)
964        }
965    };
966
967    let if_not_exists = *if_not_exists;
968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970    // Check for an object in the catalog with this same name
971    let full_name = scx.catalog.resolve_full_name(&name);
972    let partial_name = PartialItemName::from(full_name.clone());
973    // For PostgreSQL compatibility, we need to prevent creating sources when
974    // there is an existing object *or* type of the same name.
975    if let (false, Ok(item)) = (
976        if_not_exists,
977        scx.catalog.resolve_item_or_type(&partial_name),
978    ) {
979        return Err(PlanError::ItemAlreadyExists {
980            name: full_name.to_string(),
981            item_type: item.item_type(),
982        });
983    }
984
985    // We will rewrite the cluster if one is not provided, so we must use the
986    // `in_cluster` value we plan to normalize when we canonicalize the create
987    // statement.
988    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992    // Determine a default timeline for the source.
993    let timeline = match envelope {
994        SourceEnvelope::CdcV2 => {
995            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996        }
997        _ => Timeline::EpochMilliseconds,
998    };
999
1000    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002    let source = Source {
1003        create_sql,
1004        data_source,
1005        desc,
1006        compaction_window,
1007    };
1008
1009    Ok(Plan::CreateSource(CreateSourcePlan {
1010        name,
1011        source,
1012        if_not_exists,
1013        timeline,
1014        in_cluster: Some(in_cluster.id()),
1015    }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019    scx: &StatementContext<'_>,
1020    source_connection: &CreateSourceConnection<Aug>,
1021    include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023    Ok(match source_connection {
1024        CreateSourceConnection::Kafka {
1025            connection,
1026            options,
1027        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028            scx,
1029            connection,
1030            options,
1031            include_metadata,
1032        )?),
1033        CreateSourceConnection::Postgres {
1034            connection,
1035            options,
1036        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037            scx, connection, options,
1038        )?),
1039        CreateSourceConnection::SqlServer {
1040            connection,
1041            options,
1042        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043            scx, connection, options,
1044        )?),
1045        CreateSourceConnection::MySql {
1046            connection,
1047            options,
1048        } => {
1049            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050        }
1051        CreateSourceConnection::LoadGenerator { generator, options } => {
1052            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053                scx,
1054                generator,
1055                options,
1056                include_metadata,
1057            )?)
1058        }
1059    })
1060}
1061
1062fn plan_load_generator_source_connection(
1063    scx: &StatementContext<'_>,
1064    generator: &ast::LoadGenerator,
1065    options: &Vec<LoadGeneratorOption<Aug>>,
1066    include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068    let load_generator =
1069        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070    let LoadGeneratorOptionExtracted {
1071        tick_interval,
1072        as_of,
1073        up_to,
1074        ..
1075    } = options.clone().try_into()?;
1076    let tick_micros = match tick_interval {
1077        Some(interval) => Some(interval.as_micros().try_into()?),
1078        None => None,
1079    };
1080    if up_to < as_of {
1081        sql_bail!("UP TO cannot be less than AS OF");
1082    }
1083    Ok(LoadGeneratorSourceConnection {
1084        load_generator,
1085        tick_micros,
1086        as_of,
1087        up_to,
1088    })
1089}
1090
1091fn plan_mysql_source_connection(
1092    scx: &StatementContext<'_>,
1093    connection: &ResolvedItemName,
1094    options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096    let connection_item = scx.get_item_by_resolved_name(connection)?;
1097    match connection_item.connection()? {
1098        Connection::MySql(connection) => connection,
1099        _ => sql_bail!(
1100            "{} is not a MySQL connection",
1101            scx.catalog.resolve_full_name(connection_item.name())
1102        ),
1103    };
1104    let MySqlConfigOptionExtracted {
1105        details,
1106        // text/exclude columns are already part of the source-exports and are only included
1107        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1108        // be removed once we drop support for implicitly created subsources.
1109        text_columns: _,
1110        exclude_columns: _,
1111        seen: _,
1112    } = options.clone().try_into()?;
1113    let details = details
1114        .as_ref()
1115        .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119    Ok(MySqlSourceConnection {
1120        connection: connection_item.id(),
1121        connection_id: connection_item.id(),
1122        details,
1123    })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127    scx: &StatementContext<'_>,
1128    connection: &ResolvedItemName,
1129    options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131    let connection_item = scx.get_item_by_resolved_name(connection)?;
1132    match connection_item.connection()? {
1133        Connection::SqlServer(connection) => connection,
1134        _ => sql_bail!(
1135            "{} is not a SQL Server connection",
1136            scx.catalog.resolve_full_name(connection_item.name())
1137        ),
1138    };
1139    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140    let details = details
1141        .as_ref()
1142        .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143    let extras = hex::decode(details)
1144        .map_err(|e| sql_err!("{e}"))
1145        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147    Ok(SqlServerSourceConnection {
1148        connection_id: connection_item.id(),
1149        connection: connection_item.id(),
1150        extras,
1151    })
1152}
1153
1154fn plan_postgres_source_connection(
1155    scx: &StatementContext<'_>,
1156    connection: &ResolvedItemName,
1157    options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159    let connection_item = scx.get_item_by_resolved_name(connection)?;
1160    let PgConfigOptionExtracted {
1161        details,
1162        publication,
1163        // text columns are already part of the source-exports and are only included
1164        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1165        // be removed once we drop support for implicitly created subsources.
1166        text_columns: _,
1167        // exclude columns are already part of the source-exports and are only included
1168        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1169        // be removed once we drop support for implicitly created subsources.
1170        exclude_columns: _,
1171        seen: _,
1172    } = options.clone().try_into()?;
1173    let details = details
1174        .as_ref()
1175        .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177    let details =
1178        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179    let publication_details =
1180        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181    Ok(PostgresSourceConnection {
1182        connection: connection_item.id(),
1183        connection_id: connection_item.id(),
1184        publication: publication.expect("validated exists during purification"),
1185        publication_details,
1186    })
1187}
1188
1189fn plan_kafka_source_connection(
1190    scx: &StatementContext<'_>,
1191    connection_name: &ResolvedItemName,
1192    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193    include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197        sql_bail!(
1198            "{} is not a kafka connection",
1199            scx.catalog.resolve_full_name(connection_item.name())
1200        )
1201    }
1202    let KafkaSourceConfigOptionExtracted {
1203        group_id_prefix,
1204        topic,
1205        topic_metadata_refresh_interval,
1206        start_timestamp: _, // purified into `start_offset`
1207        start_offset,
1208        seen: _,
1209    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210    let topic = topic.expect("validated exists during purification");
1211    let mut start_offsets = BTreeMap::new();
1212    if let Some(offsets) = start_offset {
1213        for (part, offset) in offsets.iter().enumerate() {
1214            if *offset < 0 {
1215                sql_bail!("START OFFSET must be a nonnegative integer");
1216            }
1217            start_offsets.insert(i32::try_from(part)?, *offset);
1218        }
1219    }
1220    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221        // This is a librdkafka-enforced restriction that, if violated,
1222        // would result in a runtime error for the source.
1223        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224    }
1225    let metadata_columns = include_metadata
1226        .into_iter()
1227        .flat_map(|item| match item {
1228            SourceIncludeMetadata::Timestamp { alias } => {
1229                let name = match alias {
1230                    Some(name) => name.to_string(),
1231                    None => "timestamp".to_owned(),
1232                };
1233                Some((name, KafkaMetadataKind::Timestamp))
1234            }
1235            SourceIncludeMetadata::Partition { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "partition".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Partition))
1241            }
1242            SourceIncludeMetadata::Offset { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "offset".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Offset))
1248            }
1249            SourceIncludeMetadata::Headers { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "headers".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Headers))
1255            }
1256            SourceIncludeMetadata::Header {
1257                alias,
1258                key,
1259                use_bytes,
1260            } => Some((
1261                alias.to_string(),
1262                KafkaMetadataKind::Header {
1263                    key: key.clone(),
1264                    use_bytes: *use_bytes,
1265                },
1266            )),
1267            SourceIncludeMetadata::Key { .. } => {
1268                // handled below
1269                None
1270            }
1271        })
1272        .collect();
1273    Ok(KafkaSourceConnection {
1274        connection: connection_item.id(),
1275        connection_id: connection_item.id(),
1276        topic,
1277        start_offsets,
1278        group_id_prefix,
1279        topic_metadata_refresh_interval,
1280        metadata_columns,
1281    })
1282}
1283
1284fn apply_source_envelope_encoding(
1285    scx: &StatementContext,
1286    envelope: &ast::SourceEnvelope,
1287    format: &Option<FormatSpecifier<Aug>>,
1288    key_desc: Option<RelationDesc>,
1289    value_desc: RelationDesc,
1290    include_metadata: &[SourceIncludeMetadata],
1291    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292    source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294    (
1295        RelationDesc,
1296        SourceEnvelope,
1297        Option<SourceDataEncoding<ReferencedConnection>>,
1298    ),
1299    PlanError,
1300> {
1301    let encoding = match format {
1302        Some(format) => Some(get_encoding(scx, format, envelope)?),
1303        None => None,
1304    };
1305
1306    let (key_desc, value_desc) = match &encoding {
1307        Some(encoding) => {
1308            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1309            // single column of type bytes.
1310            match value_desc.typ().columns() {
1311                [typ] => match typ.scalar_type {
1312                    SqlScalarType::Bytes => {}
1313                    _ => sql_bail!(
1314                        "The schema produced by the source is incompatible with format decoding"
1315                    ),
1316                },
1317                _ => sql_bail!(
1318                    "The schema produced by the source is incompatible with format decoding"
1319                ),
1320            }
1321
1322            let (key_desc, value_desc) = encoding.desc()?;
1323
1324            // TODO(petrosagg): This piece of code seems to be making a statement about the
1325            // nullability of the NONE envelope when the source is Kafka. As written, the code
1326            // misses opportunities to mark columns as not nullable and is over conservative. For
1327            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1328            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1329            // should be replaced with precise type-level reasoning.
1330            let key_desc = key_desc.map(|desc| {
1331                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333                if is_kafka && is_envelope_none {
1334                    RelationDesc::from_names_and_types(
1335                        desc.into_iter()
1336                            .map(|(name, typ)| (name, typ.nullable(true))),
1337                    )
1338                } else {
1339                    desc
1340                }
1341            });
1342            (key_desc, value_desc)
1343        }
1344        None => (key_desc, value_desc),
1345    };
1346
1347    // KEY VALUE load generators are the only UPSERT source that
1348    // has no encoding but defaults to `INCLUDE KEY`.
1349    //
1350    // As discussed
1351    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1352    // removing this special case amounts to deciding how to handle null keys
1353    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1354    // this special case in.
1355    //
1356    // Note that this is safe because this generator is
1357    // 1. The only source with no encoding that can have its key included.
1358    // 2. Never produces null keys (or values, for that matter).
1359    let key_envelope_no_encoding = matches!(
1360        source_connection,
1361        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362            load_generator: LoadGenerator::KeyValue(_),
1363            ..
1364        })
1365    );
1366    let mut key_envelope = get_key_envelope(
1367        include_metadata,
1368        encoding.as_ref(),
1369        key_envelope_no_encoding,
1370    )?;
1371
1372    match (&envelope, &key_envelope) {
1373        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376        ),
1377        _ => {}
1378    };
1379
1380    // Not all source envelopes are compatible with all source connections.
1381    // Whoever constructs the source ingestion pipeline is responsible for
1382    // choosing compatible envelopes and connections.
1383    //
1384    // TODO(guswynn): ambiguously assert which connections and envelopes are
1385    // compatible in typechecking
1386    //
1387    // TODO: remove bails as more support for upsert is added.
1388    let envelope = match &envelope {
1389        // TODO: fixup key envelope
1390        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391        ast::SourceEnvelope::Debezium => {
1392            //TODO check that key envelope is not set
1393            let after_idx = match typecheck_debezium(&value_desc) {
1394                Ok((_before_idx, after_idx)) => Ok(after_idx),
1395                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396                    Some(DataEncoding::Avro(_)) => Err(type_err),
1397                    _ => Err(sql_err!(
1398                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399                    )),
1400                },
1401            }?;
1402
1403            UnplannedSourceEnvelope::Upsert {
1404                style: UpsertStyle::Debezium { after_idx },
1405            }
1406        }
1407        ast::SourceEnvelope::Upsert {
1408            value_decode_err_policy,
1409        } => {
1410            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411                None => {
1412                    if !key_envelope_no_encoding {
1413                        bail_unsupported!(format!(
1414                            "UPSERT requires a key/value format: {:?}",
1415                            format
1416                        ))
1417                    }
1418                    None
1419                }
1420                Some(key_encoding) => Some(key_encoding),
1421            };
1422            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1423            // specified.
1424            if key_envelope == KeyEnvelope::None {
1425                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426            }
1427            // If the value decode error policy is not set we use the default upsert style.
1428            let style = match value_decode_err_policy.as_slice() {
1429                [] => UpsertStyle::Default(key_envelope),
1430                [SourceErrorPolicy::Inline { alias }] => {
1431                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432                    UpsertStyle::ValueErrInline {
1433                        key_envelope,
1434                        error_column: alias
1435                            .as_ref()
1436                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437                    }
1438                }
1439                _ => {
1440                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441                }
1442            };
1443
1444            UnplannedSourceEnvelope::Upsert { style }
1445        }
1446        ast::SourceEnvelope::CdcV2 => {
1447            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448            //TODO check that key envelope is not set
1449            match format {
1450                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452            }
1453            UnplannedSourceEnvelope::CdcV2
1454        }
1455    };
1456
1457    let metadata_desc = included_column_desc(metadata_columns_desc);
1458    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460    Ok((desc, envelope, encoding))
1461}
1462
1463/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1464/// of columns and constraints.
1465fn plan_source_export_desc(
1466    scx: &StatementContext,
1467    name: &UnresolvedItemName,
1468    columns: &Vec<ColumnDef<Aug>>,
1469    constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471    let names: Vec<_> = columns
1472        .iter()
1473        .map(|c| normalize::column_name(c.name.clone()))
1474        .collect();
1475
1476    if let Some(dup) = names.iter().duplicates().next() {
1477        sql_bail!("column {} specified more than once", dup.quoted());
1478    }
1479
1480    // Build initial relation type that handles declared data types
1481    // and NOT NULL constraints.
1482    let mut column_types = Vec::with_capacity(columns.len());
1483    let mut keys = Vec::new();
1484
1485    for (i, c) in columns.into_iter().enumerate() {
1486        let aug_data_type = &c.data_type;
1487        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488        let mut nullable = true;
1489        for option in &c.options {
1490            match &option.option {
1491                ColumnOption::NotNull => nullable = false,
1492                ColumnOption::Default(_) => {
1493                    bail_unsupported!("Source export with default value")
1494                }
1495                ColumnOption::Unique { is_primary } => {
1496                    keys.push(vec![i]);
1497                    if *is_primary {
1498                        nullable = false;
1499                    }
1500                }
1501                other => {
1502                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1503                }
1504            }
1505        }
1506        column_types.push(ty.nullable(nullable));
1507    }
1508
1509    let mut seen_primary = false;
1510    'c: for constraint in constraints {
1511        match constraint {
1512            TableConstraint::Unique {
1513                name: _,
1514                columns,
1515                is_primary,
1516                nulls_not_distinct,
1517            } => {
1518                if seen_primary && *is_primary {
1519                    sql_bail!(
1520                        "multiple primary keys for source export {} are not allowed",
1521                        name.to_ast_string_stable()
1522                    );
1523                }
1524                seen_primary = *is_primary || seen_primary;
1525
1526                let mut key = vec![];
1527                for column in columns {
1528                    let column = normalize::column_name(column.clone());
1529                    match names.iter().position(|name| *name == column) {
1530                        None => sql_bail!("unknown column in constraint: {}", column),
1531                        Some(i) => {
1532                            let nullable = &mut column_types[i].nullable;
1533                            if *is_primary {
1534                                if *nulls_not_distinct {
1535                                    sql_bail!(
1536                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537                                    );
1538                                }
1539                                *nullable = false;
1540                            } else if !(*nulls_not_distinct || !*nullable) {
1541                                // Non-primary key unique constraints are only keys if all of their
1542                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1543                                break 'c;
1544                            }
1545
1546                            key.push(i);
1547                        }
1548                    }
1549                }
1550
1551                if *is_primary {
1552                    keys.insert(0, key);
1553                } else {
1554                    keys.push(key);
1555                }
1556            }
1557            TableConstraint::ForeignKey { .. } => {
1558                bail_unsupported!("Source export with a foreign key")
1559            }
1560            TableConstraint::Check { .. } => {
1561                bail_unsupported!("Source export with a check constraint")
1562            }
1563        }
1564    }
1565
1566    let typ = SqlRelationType::new(column_types).with_keys(keys);
1567    let desc = RelationDesc::new(typ, names);
1568    Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572    CreateSubsourceOption,
1573    (Progress, bool, Default(false)),
1574    (ExternalReference, UnresolvedItemName),
1575    (RetainHistory, OptionalDuration),
1576    (TextColumns, Vec::<Ident>, Default(vec![])),
1577    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578    (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582    scx: &StatementContext,
1583    stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585    let CreateSubsourceStatement {
1586        name,
1587        columns,
1588        of_source,
1589        constraints,
1590        if_not_exists,
1591        with_options,
1592    } = &stmt;
1593
1594    let CreateSubsourceOptionExtracted {
1595        progress,
1596        retain_history,
1597        external_reference,
1598        text_columns,
1599        exclude_columns,
1600        details,
1601        seen: _,
1602    } = with_options.clone().try_into()?;
1603
1604    // This invariant is enforced during purification; we are responsible for
1605    // creating the AST for subsources as a response to CREATE SOURCE
1606    // statements, so this would fire in integration testing if we failed to
1607    // uphold it.
1608    assert!(
1609        progress ^ (external_reference.is_some() && of_source.is_some()),
1610        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611    );
1612
1613    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615    let data_source = if let Some(source_reference) = of_source {
1616        // If the new source table syntax is forced we should not be creating any non-progress
1617        // subsources.
1618        if scx.catalog.system_vars().enable_create_table_from_source()
1619            && scx.catalog.system_vars().force_source_table_syntax()
1620        {
1621            Err(PlanError::UseTablesForSources(
1622                "CREATE SUBSOURCE".to_string(),
1623            ))?;
1624        }
1625
1626        // This is a subsource with the "natural" dependency order, i.e. it is
1627        // not a legacy subsource with the inverted structure.
1628        let ingestion_id = *source_reference.item_id();
1629        let external_reference = external_reference.unwrap();
1630
1631        // Decode the details option stored on the subsource statement, which contains information
1632        // created during the purification process.
1633        let details = details
1634            .as_ref()
1635            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637        let details =
1638            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639        let details =
1640            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641        let details = match details {
1642            SourceExportStatementDetails::Postgres { table } => {
1643                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644                    column_casts: crate::pure::postgres::generate_column_casts(
1645                        scx,
1646                        &table,
1647                        &text_columns,
1648                    )?,
1649                    table,
1650                })
1651            }
1652            SourceExportStatementDetails::MySql {
1653                table,
1654                initial_gtid_set,
1655            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656                table,
1657                initial_gtid_set,
1658                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659                exclude_columns: exclude_columns
1660                    .into_iter()
1661                    .map(|c| c.into_string())
1662                    .collect(),
1663            }),
1664            SourceExportStatementDetails::SqlServer {
1665                table,
1666                capture_instance,
1667                initial_lsn,
1668            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669                capture_instance,
1670                table,
1671                initial_lsn,
1672                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673                exclude_columns: exclude_columns
1674                    .into_iter()
1675                    .map(|c| c.into_string())
1676                    .collect(),
1677            }),
1678            SourceExportStatementDetails::LoadGenerator { output } => {
1679                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680            }
1681            SourceExportStatementDetails::Kafka {} => {
1682                bail_unsupported!("subsources cannot reference Kafka sources")
1683            }
1684        };
1685        DataSourceDesc::IngestionExport {
1686            ingestion_id,
1687            external_reference,
1688            details,
1689            // Subsources don't currently support non-default envelopes / encoding
1690            data_config: SourceExportDataConfig {
1691                envelope: SourceEnvelope::None(NoneEnvelope {
1692                    key_envelope: KeyEnvelope::None,
1693                    key_arity: 0,
1694                }),
1695                encoding: None,
1696            },
1697        }
1698    } else if progress {
1699        DataSourceDesc::Progress
1700    } else {
1701        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702    };
1703
1704    let if_not_exists = *if_not_exists;
1705    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710    let source = Source {
1711        create_sql,
1712        data_source,
1713        desc,
1714        compaction_window,
1715    };
1716
1717    Ok(Plan::CreateSource(CreateSourcePlan {
1718        name,
1719        source,
1720        if_not_exists,
1721        timeline: Timeline::EpochMilliseconds,
1722        in_cluster: None,
1723    }))
1724}
1725
1726generate_extracted_config!(
1727    TableFromSourceOption,
1728    (TextColumns, Vec::<Ident>, Default(vec![])),
1729    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730    (PartitionBy, Vec<Ident>),
1731    (RetainHistory, OptionalDuration),
1732    (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736    scx: &StatementContext,
1737    stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739    if !scx.catalog.system_vars().enable_create_table_from_source() {
1740        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741    }
1742
1743    let CreateTableFromSourceStatement {
1744        name,
1745        columns,
1746        constraints,
1747        if_not_exists,
1748        source,
1749        external_reference,
1750        envelope,
1751        format,
1752        include_metadata,
1753        with_options,
1754    } = &stmt;
1755
1756    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758    let TableFromSourceOptionExtracted {
1759        text_columns,
1760        exclude_columns,
1761        retain_history,
1762        partition_by,
1763        details,
1764        seen: _,
1765    } = with_options.clone().try_into()?;
1766
1767    let source_item = scx.get_item_by_resolved_name(source)?;
1768    let ingestion_id = source_item.id();
1769
1770    // Decode the details option stored on the statement, which contains information
1771    // created during the purification process.
1772    let details = details
1773        .as_ref()
1774        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776    let details =
1777        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778    let details =
1779        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782        && include_metadata
1783            .iter()
1784            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785    {
1786        // TODO(guswynn): should this be `bail_unsupported!`?
1787        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788    }
1789    if !matches!(
1790        details,
1791        SourceExportStatementDetails::Kafka { .. }
1792            | SourceExportStatementDetails::LoadGenerator { .. }
1793    ) && !include_metadata.is_empty()
1794    {
1795        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796    }
1797
1798    let details = match details {
1799        SourceExportStatementDetails::Postgres { table } => {
1800            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801                column_casts: crate::pure::postgres::generate_column_casts(
1802                    scx,
1803                    &table,
1804                    &text_columns,
1805                )?,
1806                table,
1807            })
1808        }
1809        SourceExportStatementDetails::MySql {
1810            table,
1811            initial_gtid_set,
1812        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813            table,
1814            initial_gtid_set,
1815            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816            exclude_columns: exclude_columns
1817                .into_iter()
1818                .map(|c| c.into_string())
1819                .collect(),
1820        }),
1821        SourceExportStatementDetails::SqlServer {
1822            table,
1823            capture_instance,
1824            initial_lsn,
1825        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826            table,
1827            capture_instance,
1828            initial_lsn,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834        }),
1835        SourceExportStatementDetails::LoadGenerator { output } => {
1836            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837        }
1838        SourceExportStatementDetails::Kafka {} => {
1839            if !include_metadata.is_empty()
1840                && !matches!(
1841                    envelope,
1842                    ast::SourceEnvelope::Upsert { .. }
1843                        | ast::SourceEnvelope::None
1844                        | ast::SourceEnvelope::Debezium
1845                )
1846            {
1847                // TODO(guswynn): should this be `bail_unsupported!`?
1848                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849            }
1850
1851            let metadata_columns = include_metadata
1852                .into_iter()
1853                .flat_map(|item| match item {
1854                    SourceIncludeMetadata::Timestamp { alias } => {
1855                        let name = match alias {
1856                            Some(name) => name.to_string(),
1857                            None => "timestamp".to_owned(),
1858                        };
1859                        Some((name, KafkaMetadataKind::Timestamp))
1860                    }
1861                    SourceIncludeMetadata::Partition { alias } => {
1862                        let name = match alias {
1863                            Some(name) => name.to_string(),
1864                            None => "partition".to_owned(),
1865                        };
1866                        Some((name, KafkaMetadataKind::Partition))
1867                    }
1868                    SourceIncludeMetadata::Offset { alias } => {
1869                        let name = match alias {
1870                            Some(name) => name.to_string(),
1871                            None => "offset".to_owned(),
1872                        };
1873                        Some((name, KafkaMetadataKind::Offset))
1874                    }
1875                    SourceIncludeMetadata::Headers { alias } => {
1876                        let name = match alias {
1877                            Some(name) => name.to_string(),
1878                            None => "headers".to_owned(),
1879                        };
1880                        Some((name, KafkaMetadataKind::Headers))
1881                    }
1882                    SourceIncludeMetadata::Header {
1883                        alias,
1884                        key,
1885                        use_bytes,
1886                    } => Some((
1887                        alias.to_string(),
1888                        KafkaMetadataKind::Header {
1889                            key: key.clone(),
1890                            use_bytes: *use_bytes,
1891                        },
1892                    )),
1893                    SourceIncludeMetadata::Key { .. } => {
1894                        // handled below
1895                        None
1896                    }
1897                })
1898                .collect();
1899
1900            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901        }
1902    };
1903
1904    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1907    // during purification and define the `columns` and `constraints` fields for the statement,
1908    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1909    // we use the source connection's default schema.
1910    let (key_desc, value_desc) =
1911        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912            let columns = match columns {
1913                TableFromSourceColumns::Defined(columns) => columns,
1914                _ => unreachable!(),
1915            };
1916            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917            (None, desc)
1918        } else {
1919            let key_desc = source_connection.default_key_desc();
1920            let value_desc = source_connection.default_value_desc();
1921            (Some(key_desc), value_desc)
1922        };
1923
1924    let metadata_columns_desc = match &details {
1925        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926            metadata_columns, ..
1927        }) => kafka_metadata_columns_desc(metadata_columns),
1928        _ => vec![],
1929    };
1930
1931    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932        scx,
1933        &envelope,
1934        format,
1935        key_desc,
1936        value_desc,
1937        include_metadata,
1938        metadata_columns_desc,
1939        source_connection,
1940    )?;
1941    if let TableFromSourceColumns::Named(col_names) = columns {
1942        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943    }
1944
1945    let names: Vec<_> = desc.iter_names().cloned().collect();
1946    if let Some(dup) = names.iter().duplicates().next() {
1947        sql_bail!("column {} specified more than once", dup.quoted());
1948    }
1949
1950    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952    // Allow users to specify a timeline. If they do not, determine a default
1953    // timeline for the source.
1954    let timeline = match envelope {
1955        SourceEnvelope::CdcV2 => {
1956            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957        }
1958        _ => Timeline::EpochMilliseconds,
1959    };
1960
1961    if let Some(partition_by) = partition_by {
1962        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963        check_partition_by(&desc, partition_by)?;
1964    }
1965
1966    let data_source = DataSourceDesc::IngestionExport {
1967        ingestion_id,
1968        external_reference: external_reference
1969            .as_ref()
1970            .expect("populated in purification")
1971            .clone(),
1972        details,
1973        data_config: SourceExportDataConfig { envelope, encoding },
1974    };
1975
1976    let if_not_exists = *if_not_exists;
1977
1978    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981    let table = Table {
1982        create_sql,
1983        desc: VersionedRelationDesc::new(desc),
1984        temporary: false,
1985        compaction_window,
1986        data_source: TableDataSource::DataSource {
1987            desc: data_source,
1988            timeline,
1989        },
1990    };
1991
1992    Ok(Plan::CreateTable(CreateTablePlan {
1993        name,
1994        table,
1995        if_not_exists,
1996    }))
1997}
1998
1999generate_extracted_config!(
2000    LoadGeneratorOption,
2001    (TickInterval, Duration),
2002    (AsOf, u64, Default(0_u64)),
2003    (UpTo, u64, Default(u64::MAX)),
2004    (ScaleFactor, f64),
2005    (MaxCardinality, u64),
2006    (Keys, u64),
2007    (SnapshotRounds, u64),
2008    (TransactionalSnapshot, bool),
2009    (ValueSize, u64),
2010    (Seed, u64),
2011    (Partitions, u64),
2012    (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016    pub(super) fn ensure_only_valid_options(
2017        &self,
2018        loadgen: &ast::LoadGenerator,
2019    ) -> Result<(), PlanError> {
2020        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022        let mut options = self.seen.clone();
2023
2024        let permitted_options: &[_] = match loadgen {
2025            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031            ast::LoadGenerator::KeyValue => &[
2032                TickInterval,
2033                Keys,
2034                SnapshotRounds,
2035                TransactionalSnapshot,
2036                ValueSize,
2037                Seed,
2038                Partitions,
2039                BatchSize,
2040            ],
2041        };
2042
2043        for o in permitted_options {
2044            options.remove(o);
2045        }
2046
2047        if !options.is_empty() {
2048            sql_bail!(
2049                "{} load generators do not support {} values",
2050                loadgen,
2051                options.iter().join(", ")
2052            )
2053        }
2054
2055        Ok(())
2056    }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060    scx: &StatementContext,
2061    loadgen: &ast::LoadGenerator,
2062    options: &[LoadGeneratorOption<Aug>],
2063    include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066    extracted.ensure_only_valid_options(loadgen)?;
2067
2068    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070    }
2071
2072    let load_generator = match loadgen {
2073        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074        ast::LoadGenerator::Clock => {
2075            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076            LoadGenerator::Clock
2077        }
2078        ast::LoadGenerator::Counter => {
2079            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080            let LoadGeneratorOptionExtracted {
2081                max_cardinality, ..
2082            } = extracted;
2083            LoadGenerator::Counter { max_cardinality }
2084        }
2085        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086        ast::LoadGenerator::Datums => {
2087            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088            LoadGenerator::Datums
2089        }
2090        ast::LoadGenerator::Tpch => {
2091            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093            // Default to 0.01 scale factor (=10MB).
2094            let sf: f64 = scale_factor.unwrap_or(0.01);
2095            if !sf.is_finite() || sf < 0.0 {
2096                sql_bail!("unsupported scale factor {sf}");
2097            }
2098
2099            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100                let total = (sf * multiplier).floor();
2101                let mut i = i64::try_cast_from(total)
2102                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103                if i < 1 {
2104                    i = 1;
2105                }
2106                Ok(i)
2107            };
2108
2109            // The multiplications here are safely unchecked because they will
2110            // overflow to infinity, which will be caught by f64_to_i64.
2111            let count_supplier = f_to_i(10_000f64)?;
2112            let count_part = f_to_i(200_000f64)?;
2113            let count_customer = f_to_i(150_000f64)?;
2114            let count_orders = f_to_i(150_000f64 * 10f64)?;
2115            let count_clerk = f_to_i(1_000f64)?;
2116
2117            LoadGenerator::Tpch {
2118                count_supplier,
2119                count_part,
2120                count_customer,
2121                count_orders,
2122                count_clerk,
2123            }
2124        }
2125        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127            let LoadGeneratorOptionExtracted {
2128                keys,
2129                snapshot_rounds,
2130                transactional_snapshot,
2131                value_size,
2132                tick_interval,
2133                seed,
2134                partitions,
2135                batch_size,
2136                ..
2137            } = extracted;
2138
2139            let mut include_offset = None;
2140            for im in include_metadata {
2141                match im {
2142                    SourceIncludeMetadata::Offset { alias } => {
2143                        include_offset = match alias {
2144                            Some(alias) => Some(alias.to_string()),
2145                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146                        }
2147                    }
2148                    SourceIncludeMetadata::Key { .. } => continue,
2149
2150                    _ => {
2151                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152                    }
2153                };
2154            }
2155
2156            let lgkv = KeyValueLoadGenerator {
2157                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158                snapshot_rounds: snapshot_rounds
2159                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160                // Defaults to true.
2161                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162                value_size: value_size
2163                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164                partitions: partitions
2165                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166                tick_interval,
2167                batch_size: batch_size
2168                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170                include_offset,
2171            };
2172
2173            if lgkv.keys == 0
2174                || lgkv.partitions == 0
2175                || lgkv.value_size == 0
2176                || lgkv.batch_size == 0
2177            {
2178                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179            }
2180
2181            if lgkv.keys % lgkv.partitions != 0 {
2182                sql_bail!("KEYS must be a multiple of PARTITIONS")
2183            }
2184
2185            if lgkv.batch_size > lgkv.keys {
2186                sql_bail!("KEYS must be larger than BATCH SIZE")
2187            }
2188
2189            // This constraints simplifies the source implementation.
2190            // We can lift it later.
2191            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193            }
2194
2195            if lgkv.snapshot_rounds == 0 {
2196                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197            }
2198
2199            LoadGenerator::KeyValue(lgkv)
2200        }
2201    };
2202
2203    Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207    let before = value_desc.get_by_name(&"before".into());
2208    let (after_idx, after_ty) = value_desc
2209        .get_by_name(&"after".into())
2210        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211    let before_idx = if let Some((before_idx, before_ty)) = before {
2212        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213            sql_bail!("'before' column must be of type record");
2214        }
2215        if before_ty != after_ty {
2216            sql_bail!("'before' type differs from 'after' column");
2217        }
2218        Some(before_idx)
2219    } else {
2220        None
2221    };
2222    Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226    scx: &StatementContext,
2227    format: &FormatSpecifier<Aug>,
2228    envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230    let encoding = match format {
2231        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232        FormatSpecifier::KeyValue { key, value } => {
2233            let key = {
2234                let encoding = get_encoding_inner(scx, key)?;
2235                Some(encoding.key.unwrap_or(encoding.value))
2236            };
2237            let value = get_encoding_inner(scx, value)?.value;
2238            SourceDataEncoding { key, value }
2239        }
2240    };
2241
2242    let requires_keyvalue = matches!(
2243        envelope,
2244        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245    );
2246    let is_keyvalue = encoding.key.is_some();
2247    if requires_keyvalue && !is_keyvalue {
2248        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249    };
2250
2251    Ok(encoding)
2252}
2253
2254/// Determine the cluster ID to use for this item.
2255///
2256/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2257/// Because of this, do not normalize/canonicalize the create SQL statement
2258/// until after calling this function.
2259fn source_sink_cluster_config<'a, 'ctx>(
2260    scx: &'a StatementContext<'ctx>,
2261    in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263    let cluster = match in_cluster {
2264        None => {
2265            let cluster = scx.catalog.resolve_cluster(None)?;
2266            *in_cluster = Some(ResolvedClusterName {
2267                id: cluster.id(),
2268                print_name: None,
2269            });
2270            cluster
2271        }
2272        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273    };
2274
2275    Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282    pub key_schema: Option<String>,
2283    pub value_schema: String,
2284    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285    pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289    scx: &StatementContext,
2290    format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292    let value = match format {
2293        Format::Bytes => DataEncoding::Bytes,
2294        Format::Avro(schema) => {
2295            let Schema {
2296                key_schema,
2297                value_schema,
2298                csr_connection,
2299                confluent_wire_format,
2300            } = match schema {
2301                // TODO(jldlaughlin): we need a way to pass in primary key information
2302                // when building a source from a string or file.
2303                AvroSchema::InlineSchema {
2304                    schema: ast::Schema { schema },
2305                    with_options,
2306                } => {
2307                    let AvroSchemaOptionExtracted {
2308                        confluent_wire_format,
2309                        ..
2310                    } = with_options.clone().try_into()?;
2311
2312                    Schema {
2313                        key_schema: None,
2314                        value_schema: schema.clone(),
2315                        csr_connection: None,
2316                        confluent_wire_format,
2317                    }
2318                }
2319                AvroSchema::Csr {
2320                    csr_connection:
2321                        CsrConnectionAvro {
2322                            connection,
2323                            seed,
2324                            key_strategy: _,
2325                            value_strategy: _,
2326                        },
2327                } => {
2328                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329                    let csr_connection = match item.connection()? {
2330                        Connection::Csr(_) => item.id(),
2331                        _ => {
2332                            sql_bail!(
2333                                "{} is not a schema registry connection",
2334                                scx.catalog
2335                                    .resolve_full_name(item.name())
2336                                    .to_string()
2337                                    .quoted()
2338                            )
2339                        }
2340                    };
2341
2342                    if let Some(seed) = seed {
2343                        Schema {
2344                            key_schema: seed.key_schema.clone(),
2345                            value_schema: seed.value_schema.clone(),
2346                            csr_connection: Some(csr_connection),
2347                            confluent_wire_format: true,
2348                        }
2349                    } else {
2350                        unreachable!("CSR seed resolution should already have been called: Avro")
2351                    }
2352                }
2353            };
2354
2355            if let Some(key_schema) = key_schema {
2356                return Ok(SourceDataEncoding {
2357                    key: Some(DataEncoding::Avro(AvroEncoding {
2358                        schema: key_schema,
2359                        csr_connection: csr_connection.clone(),
2360                        confluent_wire_format,
2361                    })),
2362                    value: DataEncoding::Avro(AvroEncoding {
2363                        schema: value_schema,
2364                        csr_connection,
2365                        confluent_wire_format,
2366                    }),
2367                });
2368            } else {
2369                DataEncoding::Avro(AvroEncoding {
2370                    schema: value_schema,
2371                    csr_connection,
2372                    confluent_wire_format,
2373                })
2374            }
2375        }
2376        Format::Protobuf(schema) => match schema {
2377            ProtobufSchema::Csr {
2378                csr_connection:
2379                    CsrConnectionProtobuf {
2380                        connection:
2381                            CsrConnection {
2382                                connection,
2383                                options,
2384                            },
2385                        seed,
2386                    },
2387            } => {
2388                if let Some(CsrSeedProtobuf { key, value }) = seed {
2389                    let item = scx.get_item_by_resolved_name(connection)?;
2390                    let _ = match item.connection()? {
2391                        Connection::Csr(connection) => connection,
2392                        _ => {
2393                            sql_bail!(
2394                                "{} is not a schema registry connection",
2395                                scx.catalog
2396                                    .resolve_full_name(item.name())
2397                                    .to_string()
2398                                    .quoted()
2399                            )
2400                        }
2401                    };
2402
2403                    if !options.is_empty() {
2404                        sql_bail!("Protobuf CSR connections do not support any options");
2405                    }
2406
2407                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2408                        descriptors: strconv::parse_bytes(&value.schema)?,
2409                        message_name: value.message_name.clone(),
2410                        confluent_wire_format: true,
2411                    });
2412                    if let Some(key) = key {
2413                        return Ok(SourceDataEncoding {
2414                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415                                descriptors: strconv::parse_bytes(&key.schema)?,
2416                                message_name: key.message_name.clone(),
2417                                confluent_wire_format: true,
2418                            })),
2419                            value,
2420                        });
2421                    }
2422                    value
2423                } else {
2424                    unreachable!("CSR seed resolution should already have been called: Proto")
2425                }
2426            }
2427            ProtobufSchema::InlineSchema {
2428                message_name,
2429                schema: ast::Schema { schema },
2430            } => {
2431                let descriptors = strconv::parse_bytes(schema)?;
2432
2433                DataEncoding::Protobuf(ProtobufEncoding {
2434                    descriptors,
2435                    message_name: message_name.to_owned(),
2436                    confluent_wire_format: false,
2437                })
2438            }
2439        },
2440        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441            regex: mz_repr::adt::regex::Regex::new(regex, false)
2442                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443        }),
2444        Format::Csv { columns, delimiter } => {
2445            let columns = match columns {
2446                CsvColumns::Header { names } => {
2447                    if names.is_empty() {
2448                        sql_bail!("[internal error] column spec should get names in purify")
2449                    }
2450                    ColumnSpec::Header {
2451                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452                    }
2453                }
2454                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455            };
2456            DataEncoding::Csv(CsvEncoding {
2457                columns,
2458                delimiter: u8::try_from(*delimiter)
2459                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460            })
2461        }
2462        Format::Json { array: false } => DataEncoding::Json,
2463        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464        Format::Text => DataEncoding::Text,
2465    };
2466    Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469/// Extract the key envelope, if it is requested
2470fn get_key_envelope(
2471    included_items: &[SourceIncludeMetadata],
2472    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473    key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475    let key_definition = included_items
2476        .iter()
2477        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482            (Some(name), _) if key_envelope_no_encoding => {
2483                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484            }
2485            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486            (_, None) => {
2487                // `kd.alias` == `None` means `INCLUDE KEY`
2488                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2489                // These both make sense with the same error message
2490                sql_bail!(
2491                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492                        got bare FORMAT"
2493                );
2494            }
2495        }
2496    } else {
2497        Ok(KeyEnvelope::None)
2498    }
2499}
2500
2501/// Gets the key envelope for a given key encoding when no name for the key has
2502/// been requested by the user.
2503fn get_unnamed_key_envelope(
2504    key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506    // If the key is requested but comes from an unnamed type then it gets the name "key"
2507    //
2508    // Otherwise it gets the names of the columns in the type
2509    let is_composite = match key {
2510        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511        Some(
2512            DataEncoding::Avro(_)
2513            | DataEncoding::Csv(_)
2514            | DataEncoding::Protobuf(_)
2515            | DataEncoding::Regex { .. },
2516        ) => true,
2517        None => false,
2518    };
2519
2520    if is_composite {
2521        Ok(KeyEnvelope::Flattened)
2522    } else {
2523        Ok(KeyEnvelope::Named("key".to_string()))
2524    }
2525}
2526
2527pub fn describe_create_view(
2528    _: &StatementContext,
2529    _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531    Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535    scx: &StatementContext,
2536    def: &mut ViewDefinition<Aug>,
2537    temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539    let create_sql = normalize::create_statement(
2540        scx,
2541        Statement::CreateView(CreateViewStatement {
2542            if_exists: IfExistsBehavior::Error,
2543            temporary,
2544            definition: def.clone(),
2545        }),
2546    )?;
2547
2548    let ViewDefinition {
2549        name,
2550        columns,
2551        query,
2552    } = def;
2553
2554    let query::PlannedRootQuery {
2555        expr,
2556        mut desc,
2557        finishing,
2558        scope: _,
2559    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2561    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2562    // here to help with database-issues#236. However, in the meantime, there might be a better
2563    // approach to solve database-issues#236:
2564    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2565    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566        &finishing,
2567        expr.arity()
2568    ));
2569    if expr.contains_parameters()? {
2570        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571    }
2572
2573    let dependencies = expr
2574        .depends_on()
2575        .into_iter()
2576        .map(|gid| scx.catalog.resolve_item_id(&gid))
2577        .collect();
2578
2579    let name = if temporary {
2580        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581    } else {
2582        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583    };
2584
2585    plan_utils::maybe_rename_columns(
2586        format!("view {}", scx.catalog.resolve_full_name(&name)),
2587        &mut desc,
2588        columns,
2589    )?;
2590    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592    if let Some(dup) = names.iter().duplicates().next() {
2593        sql_bail!("column {} specified more than once", dup.quoted());
2594    }
2595
2596    let view = View {
2597        create_sql,
2598        expr,
2599        dependencies,
2600        column_names: names,
2601        temporary,
2602    };
2603
2604    Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608    scx: &StatementContext,
2609    mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611    let CreateViewStatement {
2612        temporary,
2613        if_exists,
2614        definition,
2615    } = &mut stmt;
2616    let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618    // Override the statement-level IfExistsBehavior with Skip if this is
2619    // explicitly requested in the PlanContext (the default is `false`).
2620    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623        let if_exists = true;
2624        let cascade = false;
2625        let maybe_item_to_drop = plan_drop_item(
2626            scx,
2627            ObjectType::View,
2628            if_exists,
2629            definition.name.clone(),
2630            cascade,
2631        )?;
2632
2633        // Check if the new View depends on the item that we would be replacing.
2634        if let Some(id) = maybe_item_to_drop {
2635            let dependencies = view.expr.depends_on();
2636            let invalid_drop = scx
2637                .get_item(&id)
2638                .global_ids()
2639                .any(|gid| dependencies.contains(&gid));
2640            if invalid_drop {
2641                let item = scx.catalog.get_item(&id);
2642                sql_bail!(
2643                    "cannot replace view {0}: depended upon by new {0} definition",
2644                    scx.catalog.resolve_full_name(item.name())
2645                );
2646            }
2647
2648            Some(id)
2649        } else {
2650            None
2651        }
2652    } else {
2653        None
2654    };
2655    let drop_ids = replace
2656        .map(|id| {
2657            scx.catalog
2658                .item_dependents(id)
2659                .into_iter()
2660                .map(|id| id.unwrap_item_id())
2661                .collect()
2662        })
2663        .unwrap_or_default();
2664
2665    // Check for an object in the catalog with this same name
2666    let full_name = scx.catalog.resolve_full_name(&name);
2667    let partial_name = PartialItemName::from(full_name.clone());
2668    // For PostgreSQL compatibility, we need to prevent creating views when
2669    // there is an existing object *or* type of the same name.
2670    if let (Ok(item), IfExistsBehavior::Error, false) = (
2671        scx.catalog.resolve_item_or_type(&partial_name),
2672        *if_exists,
2673        ignore_if_exists_errors,
2674    ) {
2675        return Err(PlanError::ItemAlreadyExists {
2676            name: full_name.to_string(),
2677            item_type: item.item_type(),
2678        });
2679    }
2680
2681    Ok(Plan::CreateView(CreateViewPlan {
2682        name,
2683        view,
2684        replace,
2685        drop_ids,
2686        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2687        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2688    }))
2689}
2690
2691pub fn describe_create_materialized_view(
2692    _: &StatementContext,
2693    _: CreateMaterializedViewStatement<Aug>,
2694) -> Result<StatementDesc, PlanError> {
2695    Ok(StatementDesc::new(None))
2696}
2697
2698pub fn describe_create_continual_task(
2699    _: &StatementContext,
2700    _: CreateContinualTaskStatement<Aug>,
2701) -> Result<StatementDesc, PlanError> {
2702    Ok(StatementDesc::new(None))
2703}
2704
2705pub fn describe_create_network_policy(
2706    _: &StatementContext,
2707    _: CreateNetworkPolicyStatement<Aug>,
2708) -> Result<StatementDesc, PlanError> {
2709    Ok(StatementDesc::new(None))
2710}
2711
2712pub fn describe_alter_network_policy(
2713    _: &StatementContext,
2714    _: AlterNetworkPolicyStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716    Ok(StatementDesc::new(None))
2717}
2718
2719pub fn plan_create_materialized_view(
2720    scx: &StatementContext,
2721    mut stmt: CreateMaterializedViewStatement<Aug>,
2722) -> Result<Plan, PlanError> {
2723    let cluster_id =
2724        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2725    stmt.in_cluster = Some(ResolvedClusterName {
2726        id: cluster_id,
2727        print_name: None,
2728    });
2729
2730    let create_sql =
2731        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2732
2733    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2734    let name = scx.allocate_qualified_name(partial_name.clone())?;
2735
2736    let query::PlannedRootQuery {
2737        expr,
2738        mut desc,
2739        finishing,
2740        scope: _,
2741    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2742    // We get back a trivial finishing, see comment in `plan_view`.
2743    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2744        &finishing,
2745        expr.arity()
2746    ));
2747    if expr.contains_parameters()? {
2748        return Err(PlanError::ParameterNotAllowed(
2749            "materialized views".to_string(),
2750        ));
2751    }
2752
2753    plan_utils::maybe_rename_columns(
2754        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2755        &mut desc,
2756        &stmt.columns,
2757    )?;
2758    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2759
2760    let MaterializedViewOptionExtracted {
2761        assert_not_null,
2762        partition_by,
2763        retain_history,
2764        refresh,
2765        seen: _,
2766    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2767
2768    if let Some(partition_by) = partition_by {
2769        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2770        check_partition_by(&desc, partition_by)?;
2771    }
2772
2773    let refresh_schedule = {
2774        let mut refresh_schedule = RefreshSchedule::default();
2775        let mut on_commits_seen = 0;
2776        for refresh_option_value in refresh {
2777            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2778                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2779            }
2780            match refresh_option_value {
2781                RefreshOptionValue::OnCommit => {
2782                    on_commits_seen += 1;
2783                }
2784                RefreshOptionValue::AtCreation => {
2785                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2786                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2787                }
2788                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2789                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2790                    let ecx = &ExprContext {
2791                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2792                        name: "REFRESH AT",
2793                        scope: &Scope::empty(),
2794                        relation_type: &SqlRelationType::empty(),
2795                        allow_aggregates: false,
2796                        allow_subqueries: false,
2797                        allow_parameters: false,
2798                        allow_windows: false,
2799                    };
2800                    let hir = plan_expr(ecx, &time)?.cast_to(
2801                        ecx,
2802                        CastContext::Assignment,
2803                        &SqlScalarType::MzTimestamp,
2804                    )?;
2805                    // (mz_now was purified away to a literal earlier)
2806                    let timestamp = hir
2807                        .into_literal_mz_timestamp()
2808                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2809                    refresh_schedule.ats.push(timestamp);
2810                }
2811                RefreshOptionValue::Every(RefreshEveryOptionValue {
2812                    interval,
2813                    aligned_to,
2814                }) => {
2815                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2816                    if interval.as_microseconds() <= 0 {
2817                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2818                    }
2819                    if interval.months != 0 {
2820                        // This limitation is because we want Intervals to be cleanly convertable
2821                        // to a unix epoch timestamp difference. When the interval involves months, then
2822                        // this is not true anymore, because months have variable lengths.
2823                        // See `Timestamp::round_up`.
2824                        sql_bail!("REFRESH interval must not involve units larger than days");
2825                    }
2826                    let interval = interval.duration()?;
2827                    if u64::try_from(interval.as_millis()).is_err() {
2828                        sql_bail!("REFRESH interval too large");
2829                    }
2830
2831                    let mut aligned_to = match aligned_to {
2832                        Some(aligned_to) => aligned_to,
2833                        None => {
2834                            soft_panic_or_log!(
2835                                "ALIGNED TO should have been filled in by purification"
2836                            );
2837                            sql_bail!(
2838                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2839                            )
2840                        }
2841                    };
2842
2843                    // Desugar the `aligned_to` expression
2844                    transform_ast::transform(scx, &mut aligned_to)?;
2845
2846                    let ecx = &ExprContext {
2847                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2848                        name: "REFRESH EVERY ... ALIGNED TO",
2849                        scope: &Scope::empty(),
2850                        relation_type: &SqlRelationType::empty(),
2851                        allow_aggregates: false,
2852                        allow_subqueries: false,
2853                        allow_parameters: false,
2854                        allow_windows: false,
2855                    };
2856                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2857                        ecx,
2858                        CastContext::Assignment,
2859                        &SqlScalarType::MzTimestamp,
2860                    )?;
2861                    // (mz_now was purified away to a literal earlier)
2862                    let aligned_to_const = aligned_to_hir
2863                        .into_literal_mz_timestamp()
2864                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2865
2866                    refresh_schedule.everies.push(RefreshEvery {
2867                        interval,
2868                        aligned_to: aligned_to_const,
2869                    });
2870                }
2871            }
2872        }
2873
2874        if on_commits_seen > 1 {
2875            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2876        }
2877        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2878            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2879        }
2880
2881        if refresh_schedule == RefreshSchedule::default() {
2882            None
2883        } else {
2884            Some(refresh_schedule)
2885        }
2886    };
2887
2888    let as_of = stmt.as_of.map(Timestamp::from);
2889    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2890    let mut non_null_assertions = assert_not_null
2891        .into_iter()
2892        .map(normalize::column_name)
2893        .map(|assertion_name| {
2894            column_names
2895                .iter()
2896                .position(|col| col == &assertion_name)
2897                .ok_or_else(|| {
2898                    sql_err!(
2899                        "column {} in ASSERT NOT NULL option not found",
2900                        assertion_name.quoted()
2901                    )
2902                })
2903        })
2904        .collect::<Result<Vec<_>, _>>()?;
2905    non_null_assertions.sort();
2906    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2907        let dup = &column_names[*dup];
2908        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2909    }
2910
2911    if let Some(dup) = column_names.iter().duplicates().next() {
2912        sql_bail!("column {} specified more than once", dup.quoted());
2913    }
2914
2915    // Override the statement-level IfExistsBehavior with Skip if this is
2916    // explicitly requested in the PlanContext (the default is `false`).
2917    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2918        Ok(true) => IfExistsBehavior::Skip,
2919        _ => stmt.if_exists,
2920    };
2921
2922    let mut replace = None;
2923    let mut if_not_exists = false;
2924    match if_exists {
2925        IfExistsBehavior::Replace => {
2926            let if_exists = true;
2927            let cascade = false;
2928            let replace_id = plan_drop_item(
2929                scx,
2930                ObjectType::MaterializedView,
2931                if_exists,
2932                partial_name.clone().into(),
2933                cascade,
2934            )?;
2935
2936            // Check if the new Materialized View depends on the item that we would be replacing.
2937            if let Some(id) = replace_id {
2938                let dependencies = expr.depends_on();
2939                let invalid_drop = scx
2940                    .get_item(&id)
2941                    .global_ids()
2942                    .any(|gid| dependencies.contains(&gid));
2943                if invalid_drop {
2944                    let item = scx.catalog.get_item(&id);
2945                    sql_bail!(
2946                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2947                        scx.catalog.resolve_full_name(item.name())
2948                    );
2949                }
2950                replace = Some(id);
2951            }
2952        }
2953        IfExistsBehavior::Skip => if_not_exists = true,
2954        IfExistsBehavior::Error => (),
2955    }
2956    let drop_ids = replace
2957        .map(|id| {
2958            scx.catalog
2959                .item_dependents(id)
2960                .into_iter()
2961                .map(|id| id.unwrap_item_id())
2962                .collect()
2963        })
2964        .unwrap_or_default();
2965    let mut dependencies: BTreeSet<_> = expr
2966        .depends_on()
2967        .into_iter()
2968        .map(|gid| scx.catalog.resolve_item_id(&gid))
2969        .collect();
2970
2971    // Validate the replacement target, if one is given.
2972    let mut replacement_target = None;
2973    if let Some(target_name) = &stmt.replacing {
2974        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2975
2976        let target = scx.get_item_by_resolved_name(target_name)?;
2977        if target.item_type() != CatalogItemType::MaterializedView {
2978            return Err(PlanError::InvalidReplacement {
2979                item_type: target.item_type(),
2980                item_name: scx.catalog.minimal_qualification(target.name()),
2981                replacement_type: CatalogItemType::MaterializedView,
2982                replacement_name: partial_name,
2983            });
2984        }
2985        if target.id().is_system() {
2986            sql_bail!(
2987                "cannot replace {} because it is required by the database system",
2988                scx.catalog.minimal_qualification(target.name()),
2989            );
2990        }
2991
2992        if !dependencies.insert(target.id()) {
2993            sql_bail!(
2994                "cannot replace {} because it is also a dependency",
2995                scx.catalog.minimal_qualification(target.name()),
2996            );
2997        }
2998
2999        replacement_target = Some(target.id());
3000    }
3001
3002    // Check for an object in the catalog with this same name
3003    let full_name = scx.catalog.resolve_full_name(&name);
3004    let partial_name = PartialItemName::from(full_name.clone());
3005    // For PostgreSQL compatibility, we need to prevent creating materialized
3006    // views when there is an existing object *or* type of the same name.
3007    if let (IfExistsBehavior::Error, Ok(item)) =
3008        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3009    {
3010        return Err(PlanError::ItemAlreadyExists {
3011            name: full_name.to_string(),
3012            item_type: item.item_type(),
3013        });
3014    }
3015
3016    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3017        name,
3018        materialized_view: MaterializedView {
3019            create_sql,
3020            expr,
3021            dependencies: DependencyIds(dependencies),
3022            column_names,
3023            replacement_target,
3024            cluster_id,
3025            non_null_assertions,
3026            compaction_window,
3027            refresh_schedule,
3028            as_of,
3029        },
3030        replace,
3031        drop_ids,
3032        if_not_exists,
3033        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3034    }))
3035}
3036
3037generate_extracted_config!(
3038    MaterializedViewOption,
3039    (AssertNotNull, Ident, AllowMultiple),
3040    (PartitionBy, Vec<Ident>),
3041    (RetainHistory, OptionalDuration),
3042    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3043);
3044
3045pub fn plan_create_continual_task(
3046    scx: &StatementContext,
3047    mut stmt: CreateContinualTaskStatement<Aug>,
3048) -> Result<Plan, PlanError> {
3049    match &stmt.sugar {
3050        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3051        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3052            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3053        }
3054        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3055            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3056        }
3057    };
3058    let cluster_id = match &stmt.in_cluster {
3059        None => scx.catalog.resolve_cluster(None)?.id(),
3060        Some(in_cluster) => in_cluster.id,
3061    };
3062    stmt.in_cluster = Some(ResolvedClusterName {
3063        id: cluster_id,
3064        print_name: None,
3065    });
3066
3067    let create_sql =
3068        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3069
3070    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3071
3072    // It seems desirable for a CT that e.g. simply filters the input to keep
3073    // the same nullability. So, start by assuming all columns are non-nullable,
3074    // and then make them nullable below if any of the exprs plan them as
3075    // nullable.
3076    let mut desc = match stmt.columns {
3077        None => None,
3078        Some(columns) => {
3079            let mut desc_columns = Vec::with_capacity(columns.capacity());
3080            for col in columns.iter() {
3081                desc_columns.push((
3082                    normalize::column_name(col.name.clone()),
3083                    SqlColumnType {
3084                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3085                        nullable: false,
3086                    },
3087                ));
3088            }
3089            Some(RelationDesc::from_names_and_types(desc_columns))
3090        }
3091    };
3092    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3093    match input.item_type() {
3094        // Input must be a thing directly backed by a persist shard, so we can
3095        // use a persist listen to efficiently rehydrate.
3096        CatalogItemType::ContinualTask
3097        | CatalogItemType::Table
3098        | CatalogItemType::MaterializedView
3099        | CatalogItemType::Source => {}
3100        CatalogItemType::Sink
3101        | CatalogItemType::View
3102        | CatalogItemType::Index
3103        | CatalogItemType::Type
3104        | CatalogItemType::Func
3105        | CatalogItemType::Secret
3106        | CatalogItemType::Connection => {
3107            sql_bail!(
3108                "CONTINUAL TASK cannot use {} as an input",
3109                input.item_type()
3110            );
3111        }
3112    }
3113
3114    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3115    let ct_name = stmt.name;
3116    let placeholder_id = match &ct_name {
3117        ResolvedItemName::ContinualTask { id, name } => {
3118            let desc = match desc.as_ref().cloned() {
3119                Some(x) => x,
3120                None => {
3121                    // The user didn't specify the CT's columns. Take a wild
3122                    // guess that the CT has the same shape as the input. It's
3123                    // fine if this is wrong, we'll get an error below after
3124                    // planning the query.
3125                    let desc = input.relation_desc().expect("item type checked above");
3126                    desc.into_owned()
3127                }
3128            };
3129            qcx.ctes.insert(
3130                *id,
3131                CteDesc {
3132                    name: name.item.clone(),
3133                    desc,
3134                },
3135            );
3136            Some(*id)
3137        }
3138        _ => None,
3139    };
3140
3141    let mut exprs = Vec::new();
3142    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3143        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3144        let query::PlannedRootQuery {
3145            expr,
3146            desc: desc_query,
3147            finishing,
3148            scope: _,
3149        } = query::plan_ct_query(&mut qcx, query)?;
3150        // We get back a trivial finishing because we plan with a "maintained"
3151        // QueryLifetime, see comment in `plan_view`.
3152        assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3153            &finishing,
3154            expr.arity()
3155        ));
3156        if expr.contains_parameters()? {
3157            if expr.contains_parameters()? {
3158                return Err(PlanError::ParameterNotAllowed(
3159                    "continual tasks".to_string(),
3160                ));
3161            }
3162        }
3163        let expr = match desc.as_mut() {
3164            None => {
3165                desc = Some(desc_query);
3166                expr
3167            }
3168            Some(desc) => {
3169                // We specify the columns for DELETE, so if any columns types don't
3170                // match, it's because it's an INSERT.
3171                if desc_query.arity() > desc.arity() {
3172                    sql_bail!(
3173                        "statement {}: INSERT has more expressions than target columns",
3174                        idx
3175                    );
3176                }
3177                if desc_query.arity() < desc.arity() {
3178                    sql_bail!(
3179                        "statement {}: INSERT has more target columns than expressions",
3180                        idx
3181                    );
3182                }
3183                // Ensure the types of the source query match the types of the target table,
3184                // installing assignment casts where necessary and possible.
3185                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3186                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3187                let expr = expr.map_err(|e| {
3188                    sql_err!(
3189                        "statement {}: column {} is of type {} but expression is of type {}",
3190                        idx,
3191                        desc.get_name(e.column).quoted(),
3192                        qcx.humanize_scalar_type(&e.target_type, false),
3193                        qcx.humanize_scalar_type(&e.source_type, false),
3194                    )
3195                })?;
3196
3197                // Update ct nullability as necessary. The `ne` above verified that the
3198                // types are the same len.
3199                let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3200                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3201                if updated {
3202                    let new_types = zip_types().map(|(ct, q)| {
3203                        let mut ct = ct.clone();
3204                        if q.nullable {
3205                            ct.nullable = true;
3206                        }
3207                        ct
3208                    });
3209                    *desc = RelationDesc::from_names_and_types(
3210                        desc.iter_names().cloned().zip_eq(new_types),
3211                    );
3212                }
3213
3214                expr
3215            }
3216        };
3217        match stmt {
3218            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3219            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3220        }
3221    }
3222    // TODO(ct3): Collect things by output and assert that there is only one (or
3223    // support multiple outputs).
3224    let expr = exprs
3225        .into_iter()
3226        .reduce(|acc, expr| acc.union(expr))
3227        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3228    let dependencies = expr
3229        .depends_on()
3230        .into_iter()
3231        .map(|gid| scx.catalog.resolve_item_id(&gid))
3232        .collect();
3233
3234    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3235    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3236    if let Some(dup) = column_names.iter().duplicates().next() {
3237        sql_bail!("column {} specified more than once", dup.quoted());
3238    }
3239
3240    // Check for an object in the catalog with this same name
3241    let name = match &ct_name {
3242        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3243        ResolvedItemName::ContinualTask { name, .. } => {
3244            let name = scx.allocate_qualified_name(name.clone())?;
3245            let full_name = scx.catalog.resolve_full_name(&name);
3246            let partial_name = PartialItemName::from(full_name.clone());
3247            // For PostgreSQL compatibility, we need to prevent creating this when there
3248            // is an existing object *or* type of the same name.
3249            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3250                return Err(PlanError::ItemAlreadyExists {
3251                    name: full_name.to_string(),
3252                    item_type: item.item_type(),
3253                });
3254            }
3255            name
3256        }
3257        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3258        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3259    };
3260
3261    let as_of = stmt.as_of.map(Timestamp::from);
3262    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3263        name,
3264        placeholder_id,
3265        desc,
3266        input_id: input.global_id(),
3267        with_snapshot: snapshot.unwrap_or(true),
3268        continual_task: MaterializedView {
3269            create_sql,
3270            expr,
3271            dependencies,
3272            column_names,
3273            replacement_target: None,
3274            cluster_id,
3275            non_null_assertions: Vec::new(),
3276            compaction_window: None,
3277            refresh_schedule: None,
3278            as_of,
3279        },
3280    }))
3281}
3282
3283fn continual_task_query<'a>(
3284    ct_name: &ResolvedItemName,
3285    stmt: &'a ast::ContinualTaskStmt<Aug>,
3286) -> Option<ast::Query<Aug>> {
3287    match stmt {
3288        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3289            table_name: _,
3290            columns,
3291            source,
3292            returning,
3293        }) => {
3294            if !columns.is_empty() || !returning.is_empty() {
3295                return None;
3296            }
3297            match source {
3298                ast::InsertSource::Query(query) => Some(query.clone()),
3299                ast::InsertSource::DefaultValues => None,
3300            }
3301        }
3302        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3303            table_name: _,
3304            alias,
3305            using,
3306            selection,
3307        }) => {
3308            if !using.is_empty() {
3309                return None;
3310            }
3311            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3312            let from = ast::TableWithJoins {
3313                relation: ast::TableFactor::Table {
3314                    name: ct_name.clone(),
3315                    alias: alias.clone(),
3316                },
3317                joins: Vec::new(),
3318            };
3319            let select = ast::Select {
3320                from: vec![from],
3321                selection: selection.clone(),
3322                distinct: None,
3323                projection: vec![ast::SelectItem::Wildcard],
3324                group_by: Vec::new(),
3325                having: None,
3326                qualify: None,
3327                options: Vec::new(),
3328            };
3329            let query = ast::Query {
3330                ctes: ast::CteBlock::Simple(Vec::new()),
3331                body: ast::SetExpr::Select(Box::new(select)),
3332                order_by: Vec::new(),
3333                limit: None,
3334                offset: None,
3335            };
3336            // Then negate it to turn it into retractions (after planning it).
3337            Some(query)
3338        }
3339    }
3340}
3341
3342generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3343
3344pub fn describe_create_sink(
3345    _: &StatementContext,
3346    _: CreateSinkStatement<Aug>,
3347) -> Result<StatementDesc, PlanError> {
3348    Ok(StatementDesc::new(None))
3349}
3350
3351generate_extracted_config!(
3352    CreateSinkOption,
3353    (Snapshot, bool),
3354    (PartitionStrategy, String),
3355    (Version, u64),
3356    (CommitInterval, Duration)
3357);
3358
3359pub fn plan_create_sink(
3360    scx: &StatementContext,
3361    stmt: CreateSinkStatement<Aug>,
3362) -> Result<Plan, PlanError> {
3363    // Check for an object in the catalog with this same name
3364    let Some(name) = stmt.name.clone() else {
3365        return Err(PlanError::MissingName(CatalogItemType::Sink));
3366    };
3367    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3368    let full_name = scx.catalog.resolve_full_name(&name);
3369    let partial_name = PartialItemName::from(full_name.clone());
3370    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3371        return Err(PlanError::ItemAlreadyExists {
3372            name: full_name.to_string(),
3373            item_type: item.item_type(),
3374        });
3375    }
3376
3377    plan_sink(scx, stmt)
3378}
3379
3380/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3381/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3382/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3383/// important.
3384fn plan_sink(
3385    scx: &StatementContext,
3386    mut stmt: CreateSinkStatement<Aug>,
3387) -> Result<Plan, PlanError> {
3388    let CreateSinkStatement {
3389        name,
3390        in_cluster: _,
3391        from,
3392        connection,
3393        format,
3394        envelope,
3395        if_not_exists,
3396        with_options,
3397    } = stmt.clone();
3398
3399    let Some(name) = name else {
3400        return Err(PlanError::MissingName(CatalogItemType::Sink));
3401    };
3402    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3403
3404    let envelope = match envelope {
3405        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3406        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3407        None => sql_bail!("ENVELOPE clause is required"),
3408    };
3409
3410    let from_name = &from;
3411    let from = scx.get_item_by_resolved_name(&from)?;
3412
3413    {
3414        use CatalogItemType::*;
3415        match from.item_type() {
3416            Table | Source | MaterializedView | ContinualTask => {}
3417            Sink | View | Index | Type | Func | Secret | Connection => {
3418                let name = scx.catalog.minimal_qualification(from.name());
3419                return Err(PlanError::InvalidSinkFrom {
3420                    name: name.to_string(),
3421                    item_type: from.item_type(),
3422                });
3423            }
3424        }
3425    }
3426
3427    if from.id().is_system() {
3428        bail_unsupported!("creating a sink directly on a catalog object");
3429    }
3430
3431    let desc = from.relation_desc().expect("item type checked above");
3432    let key_indices = match &connection {
3433        CreateSinkConnection::Kafka { key: Some(key), .. }
3434        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3435            let key_columns = key
3436                .key_columns
3437                .clone()
3438                .into_iter()
3439                .map(normalize::column_name)
3440                .collect::<Vec<_>>();
3441            let mut uniq = BTreeSet::new();
3442            for col in key_columns.iter() {
3443                if !uniq.insert(col) {
3444                    sql_bail!("duplicate column referenced in KEY: {}", col);
3445                }
3446            }
3447            let indices = key_columns
3448                .iter()
3449                .map(|col| -> anyhow::Result<usize> {
3450                    let name_idx =
3451                        desc.get_by_name(col)
3452                            .map(|(idx, _type)| idx)
3453                            .ok_or_else(|| {
3454                                sql_err!("column referenced in KEY does not exist: {}", col)
3455                            })?;
3456                    if desc.get_unambiguous_name(name_idx).is_none() {
3457                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3458                    }
3459                    Ok(name_idx)
3460                })
3461                .collect::<Result<Vec<_>, _>>()?;
3462            let is_valid_key = desc
3463                .typ()
3464                .keys
3465                .iter()
3466                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3467
3468            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3469                if key.not_enforced {
3470                    scx.catalog
3471                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3472                            key: key_columns.clone(),
3473                            name: name.item.clone(),
3474                        })
3475                } else {
3476                    return Err(PlanError::UpsertSinkWithInvalidKey {
3477                        name: from_name.full_name_str(),
3478                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3479                        valid_keys: desc
3480                            .typ()
3481                            .keys
3482                            .iter()
3483                            .map(|key| {
3484                                key.iter()
3485                                    .map(|col| desc.get_name(*col).as_str().into())
3486                                    .collect()
3487                            })
3488                            .collect(),
3489                    });
3490                }
3491            }
3492            Some(indices)
3493        }
3494        CreateSinkConnection::Kafka { key: None, .. }
3495        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3496    };
3497
3498    let headers_index = match &connection {
3499        CreateSinkConnection::Kafka {
3500            headers: Some(headers),
3501            ..
3502        } => {
3503            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3504
3505            match envelope {
3506                SinkEnvelope::Upsert => (),
3507                SinkEnvelope::Debezium => {
3508                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3509                }
3510            };
3511
3512            let headers = normalize::column_name(headers.clone());
3513            let (idx, ty) = desc
3514                .get_by_name(&headers)
3515                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3516
3517            if desc.get_unambiguous_name(idx).is_none() {
3518                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3519            }
3520
3521            match &ty.scalar_type {
3522                SqlScalarType::Map { value_type, .. }
3523                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3524                _ => sql_bail!(
3525                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3526                ),
3527            }
3528
3529            Some(idx)
3530        }
3531        _ => None,
3532    };
3533
3534    // pick the first valid natural relation key, if any
3535    let relation_key_indices = desc.typ().keys.get(0).cloned();
3536
3537    let key_desc_and_indices = key_indices.map(|key_indices| {
3538        let cols = desc
3539            .iter()
3540            .map(|(name, ty)| (name.clone(), ty.clone()))
3541            .collect::<Vec<_>>();
3542        let (names, types): (Vec<_>, Vec<_>) =
3543            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3544        let typ = SqlRelationType::new(types);
3545        (RelationDesc::new(typ, names), key_indices)
3546    });
3547
3548    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3549        return Err(PlanError::UpsertSinkWithoutKey);
3550    }
3551
3552    let CreateSinkOptionExtracted {
3553        snapshot,
3554        version,
3555        partition_strategy: _,
3556        seen: _,
3557        commit_interval,
3558    } = with_options.try_into()?;
3559
3560    let connection_builder = match connection {
3561        CreateSinkConnection::Kafka {
3562            connection,
3563            options,
3564            ..
3565        } => kafka_sink_builder(
3566            scx,
3567            connection,
3568            options,
3569            format,
3570            relation_key_indices,
3571            key_desc_and_indices,
3572            headers_index,
3573            desc.into_owned(),
3574            envelope,
3575            from.id(),
3576            commit_interval,
3577        )?,
3578        CreateSinkConnection::Iceberg {
3579            connection,
3580            aws_connection,
3581            options,
3582            ..
3583        } => iceberg_sink_builder(
3584            scx,
3585            connection,
3586            aws_connection,
3587            options,
3588            relation_key_indices,
3589            key_desc_and_indices,
3590            commit_interval,
3591        )?,
3592    };
3593
3594    // WITH SNAPSHOT defaults to true
3595    let with_snapshot = snapshot.unwrap_or(true);
3596    // VERSION defaults to 0
3597    let version = version.unwrap_or(0);
3598
3599    // We will rewrite the cluster if one is not provided, so we must use the
3600    // `in_cluster` value we plan to normalize when we canonicalize the create
3601    // statement.
3602    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3603    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3604
3605    Ok(Plan::CreateSink(CreateSinkPlan {
3606        name,
3607        sink: Sink {
3608            create_sql,
3609            from: from.global_id(),
3610            connection: connection_builder,
3611            envelope,
3612            version,
3613            commit_interval,
3614        },
3615        with_snapshot,
3616        if_not_exists,
3617        in_cluster: in_cluster.id(),
3618    }))
3619}
3620
3621fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3622    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3623
3624    let existing_keys = desc
3625        .typ()
3626        .keys
3627        .iter()
3628        .map(|key_columns| {
3629            key_columns
3630                .iter()
3631                .map(|col| desc.get_name(*col).as_str())
3632                .join(", ")
3633        })
3634        .join(", ");
3635
3636    sql_err!(
3637        "Key constraint ({}) conflicts with existing key ({})",
3638        user_keys,
3639        existing_keys
3640    )
3641}
3642
3643/// Creating this by hand instead of using generate_extracted_config! macro
3644/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3645#[derive(Debug, Default, PartialEq, Clone)]
3646pub struct CsrConfigOptionExtracted {
3647    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3648    pub(crate) avro_key_fullname: Option<String>,
3649    pub(crate) avro_value_fullname: Option<String>,
3650    pub(crate) null_defaults: bool,
3651    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3652    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3653    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3654    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3655}
3656
3657impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3658    type Error = crate::plan::PlanError;
3659    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3660        let mut extracted = CsrConfigOptionExtracted::default();
3661        let mut common_doc_comments = BTreeMap::new();
3662        for option in v {
3663            if !extracted.seen.insert(option.name.clone()) {
3664                return Err(PlanError::Unstructured({
3665                    format!("{} specified more than once", option.name)
3666                }));
3667            }
3668            let option_name = option.name.clone();
3669            let option_name_str = option_name.to_ast_string_simple();
3670            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3671                option_name: option_name.to_ast_string_simple(),
3672                err: e.into(),
3673            };
3674            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3675                val.map(|s| match s {
3676                    WithOptionValue::Value(Value::String(s)) => {
3677                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3678                    }
3679                    _ => Err("must be a string".to_string()),
3680                })
3681                .transpose()
3682                .map_err(PlanError::Unstructured)
3683                .map_err(better_error)
3684            };
3685            match option.name {
3686                CsrConfigOptionName::AvroKeyFullname => {
3687                    extracted.avro_key_fullname =
3688                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3689                }
3690                CsrConfigOptionName::AvroValueFullname => {
3691                    extracted.avro_value_fullname =
3692                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3693                }
3694                CsrConfigOptionName::NullDefaults => {
3695                    extracted.null_defaults =
3696                        <bool>::try_from_value(option.value).map_err(better_error)?;
3697                }
3698                CsrConfigOptionName::AvroDocOn(doc_on) => {
3699                    let value = String::try_from_value(option.value.ok_or_else(|| {
3700                        PlanError::InvalidOptionValue {
3701                            option_name: option_name_str,
3702                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3703                        }
3704                    })?)
3705                    .map_err(better_error)?;
3706                    let key = match doc_on.identifier {
3707                        DocOnIdentifier::Column(ast::ColumnName {
3708                            relation: ResolvedItemName::Item { id, .. },
3709                            column: ResolvedColumnReference::Column { name, index: _ },
3710                        }) => DocTarget::Field {
3711                            object_id: id,
3712                            column_name: name,
3713                        },
3714                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3715                            DocTarget::Type(id)
3716                        }
3717                        _ => unreachable!(),
3718                    };
3719
3720                    match doc_on.for_schema {
3721                        DocOnSchema::KeyOnly => {
3722                            extracted.key_doc_options.insert(key, value);
3723                        }
3724                        DocOnSchema::ValueOnly => {
3725                            extracted.value_doc_options.insert(key, value);
3726                        }
3727                        DocOnSchema::All => {
3728                            common_doc_comments.insert(key, value);
3729                        }
3730                    }
3731                }
3732                CsrConfigOptionName::KeyCompatibilityLevel => {
3733                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3734                }
3735                CsrConfigOptionName::ValueCompatibilityLevel => {
3736                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3737                }
3738            }
3739        }
3740
3741        for (key, value) in common_doc_comments {
3742            if !extracted.key_doc_options.contains_key(&key) {
3743                extracted.key_doc_options.insert(key.clone(), value.clone());
3744            }
3745            if !extracted.value_doc_options.contains_key(&key) {
3746                extracted.value_doc_options.insert(key, value);
3747            }
3748        }
3749        Ok(extracted)
3750    }
3751}
3752
3753fn iceberg_sink_builder(
3754    scx: &StatementContext,
3755    catalog_connection: ResolvedItemName,
3756    aws_connection: ResolvedItemName,
3757    options: Vec<IcebergSinkConfigOption<Aug>>,
3758    relation_key_indices: Option<Vec<usize>>,
3759    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3760    commit_interval: Option<Duration>,
3761) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3762    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3763    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3764    let catalog_connection_id = catalog_connection_item.id();
3765    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3766    let aws_connection_id = aws_connection_item.id();
3767    if !matches!(
3768        catalog_connection_item.connection()?,
3769        Connection::IcebergCatalog(_)
3770    ) {
3771        sql_bail!(
3772            "{} is not an iceberg catalog connection",
3773            scx.catalog
3774                .resolve_full_name(catalog_connection_item.name())
3775                .to_string()
3776                .quoted()
3777        );
3778    };
3779
3780    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3781        sql_bail!(
3782            "{} is not an AWS connection",
3783            scx.catalog
3784                .resolve_full_name(aws_connection_item.name())
3785                .to_string()
3786                .quoted()
3787        );
3788    }
3789
3790    let IcebergSinkConfigOptionExtracted {
3791        table,
3792        namespace,
3793        seen: _,
3794    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3795
3796    let Some(table) = table else {
3797        sql_bail!("Iceberg sink must specify TABLE");
3798    };
3799    let Some(namespace) = namespace else {
3800        sql_bail!("Iceberg sink must specify NAMESPACE");
3801    };
3802    if commit_interval.is_none() {
3803        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3804    }
3805
3806    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3807        catalog_connection_id,
3808        catalog_connection: catalog_connection_id,
3809        aws_connection_id,
3810        aws_connection: aws_connection_id,
3811        table,
3812        namespace,
3813        relation_key_indices,
3814        key_desc_and_indices,
3815    }))
3816}
3817
3818fn kafka_sink_builder(
3819    scx: &StatementContext,
3820    connection: ResolvedItemName,
3821    options: Vec<KafkaSinkConfigOption<Aug>>,
3822    format: Option<FormatSpecifier<Aug>>,
3823    relation_key_indices: Option<Vec<usize>>,
3824    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3825    headers_index: Option<usize>,
3826    value_desc: RelationDesc,
3827    envelope: SinkEnvelope,
3828    sink_from: CatalogItemId,
3829    commit_interval: Option<Duration>,
3830) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3831    // Get Kafka connection.
3832    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3833    let connection_id = connection_item.id();
3834    match connection_item.connection()? {
3835        Connection::Kafka(_) => (),
3836        _ => sql_bail!(
3837            "{} is not a kafka connection",
3838            scx.catalog.resolve_full_name(connection_item.name())
3839        ),
3840    };
3841
3842    if commit_interval.is_some() {
3843        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3844    }
3845
3846    let KafkaSinkConfigOptionExtracted {
3847        topic,
3848        compression_type,
3849        partition_by,
3850        progress_group_id_prefix,
3851        transactional_id_prefix,
3852        legacy_ids,
3853        topic_config,
3854        topic_metadata_refresh_interval,
3855        topic_partition_count,
3856        topic_replication_factor,
3857        seen: _,
3858    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3859
3860    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3861        (Some(_), Some(true)) => {
3862            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3863        }
3864        (None, Some(true)) => KafkaIdStyle::Legacy,
3865        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3866    };
3867
3868    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3869        (Some(_), Some(true)) => {
3870            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3871        }
3872        (None, Some(true)) => KafkaIdStyle::Legacy,
3873        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3874    };
3875
3876    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3877
3878    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3879        // This is a librdkafka-enforced restriction that, if violated,
3880        // would result in a runtime error for the source.
3881        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3882    }
3883
3884    let assert_positive = |val: Option<i32>, name: &str| {
3885        if let Some(val) = val {
3886            if val <= 0 {
3887                sql_bail!("{} must be a positive integer", name);
3888            }
3889        }
3890        val.map(NonNeg::try_from)
3891            .transpose()
3892            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3893    };
3894    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3895    let topic_replication_factor =
3896        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3897
3898    // Helper method to parse avro connection options for format specifiers that use avro
3899    // for either key or value encoding.
3900    let gen_avro_schema_options = |conn| {
3901        let CsrConnectionAvro {
3902            connection:
3903                CsrConnection {
3904                    connection,
3905                    options,
3906                },
3907            seed,
3908            key_strategy,
3909            value_strategy,
3910        } = conn;
3911        if seed.is_some() {
3912            sql_bail!("SEED option does not make sense with sinks");
3913        }
3914        if key_strategy.is_some() {
3915            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3916        }
3917        if value_strategy.is_some() {
3918            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3919        }
3920
3921        let item = scx.get_item_by_resolved_name(&connection)?;
3922        let csr_connection = match item.connection()? {
3923            Connection::Csr(_) => item.id(),
3924            _ => {
3925                sql_bail!(
3926                    "{} is not a schema registry connection",
3927                    scx.catalog
3928                        .resolve_full_name(item.name())
3929                        .to_string()
3930                        .quoted()
3931                )
3932            }
3933        };
3934        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3935
3936        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3937            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3938        }
3939
3940        if key_desc_and_indices.is_some()
3941            && (extracted_options.avro_key_fullname.is_some()
3942                ^ extracted_options.avro_value_fullname.is_some())
3943        {
3944            sql_bail!(
3945                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3946            );
3947        }
3948
3949        Ok((csr_connection, extracted_options))
3950    };
3951
3952    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3953        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3954        Format::Bytes if desc.arity() == 1 => {
3955            let col_type = &desc.typ().column_types[0].scalar_type;
3956            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3957                bail_unsupported!(format!(
3958                    "BYTES format with non-encodable type: {:?}",
3959                    col_type
3960                ));
3961            }
3962
3963            Ok(KafkaSinkFormatType::Bytes)
3964        }
3965        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3966        Format::Bytes | Format::Text => {
3967            bail_unsupported!("BYTES or TEXT format with multiple columns")
3968        }
3969        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3970        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3971            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3972            let schema = if is_key {
3973                AvroSchemaGenerator::new(
3974                    desc.clone(),
3975                    false,
3976                    options.key_doc_options,
3977                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3978                    options.null_defaults,
3979                    Some(sink_from),
3980                    false,
3981                )?
3982                .schema()
3983                .to_string()
3984            } else {
3985                AvroSchemaGenerator::new(
3986                    desc.clone(),
3987                    matches!(envelope, SinkEnvelope::Debezium),
3988                    options.value_doc_options,
3989                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3990                    options.null_defaults,
3991                    Some(sink_from),
3992                    true,
3993                )?
3994                .schema()
3995                .to_string()
3996            };
3997            Ok(KafkaSinkFormatType::Avro {
3998                schema,
3999                compatibility_level: if is_key {
4000                    options.key_compatibility_level
4001                } else {
4002                    options.value_compatibility_level
4003                },
4004                csr_connection,
4005            })
4006        }
4007        format => bail_unsupported!(format!("sink format {:?}", format)),
4008    };
4009
4010    let partition_by = match &partition_by {
4011        Some(partition_by) => {
4012            let mut scope = Scope::from_source(None, value_desc.iter_names());
4013
4014            match envelope {
4015                SinkEnvelope::Upsert => (),
4016                SinkEnvelope::Debezium => {
4017                    let key_indices: HashSet<_> = key_desc_and_indices
4018                        .as_ref()
4019                        .map(|(_desc, indices)| indices.as_slice())
4020                        .unwrap_or_default()
4021                        .into_iter()
4022                        .collect();
4023                    for (i, item) in scope.items.iter_mut().enumerate() {
4024                        if !key_indices.contains(&i) {
4025                            item.error_if_referenced = Some(|_table, column| {
4026                                PlanError::InvalidPartitionByEnvelopeDebezium {
4027                                    column_name: column.to_string(),
4028                                }
4029                            });
4030                        }
4031                    }
4032                }
4033            };
4034
4035            let ecx = &ExprContext {
4036                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4037                name: "PARTITION BY",
4038                scope: &scope,
4039                relation_type: value_desc.typ(),
4040                allow_aggregates: false,
4041                allow_subqueries: false,
4042                allow_parameters: false,
4043                allow_windows: false,
4044            };
4045            let expr = plan_expr(ecx, partition_by)?.cast_to(
4046                ecx,
4047                CastContext::Assignment,
4048                &SqlScalarType::UInt64,
4049            )?;
4050            let expr = expr.lower_uncorrelated()?;
4051
4052            Some(expr)
4053        }
4054        _ => None,
4055    };
4056
4057    // Map from the format specifier of the statement to the individual key/value formats for the sink.
4058    let format = match format {
4059        Some(FormatSpecifier::KeyValue { key, value }) => {
4060            let key_format = match key_desc_and_indices.as_ref() {
4061                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4062                None => None,
4063            };
4064            KafkaSinkFormat {
4065                value_format: map_format(value, &value_desc, false)?,
4066                key_format,
4067            }
4068        }
4069        Some(FormatSpecifier::Bare(format)) => {
4070            let key_format = match key_desc_and_indices.as_ref() {
4071                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4072                None => None,
4073            };
4074            KafkaSinkFormat {
4075                value_format: map_format(format, &value_desc, false)?,
4076                key_format,
4077            }
4078        }
4079        None => bail_unsupported!("sink without format"),
4080    };
4081
4082    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4083        connection_id,
4084        connection: connection_id,
4085        format,
4086        topic: topic_name,
4087        relation_key_indices,
4088        key_desc_and_indices,
4089        headers_index,
4090        value_desc,
4091        partition_by,
4092        compression_type,
4093        progress_group_id,
4094        transactional_id,
4095        topic_options: KafkaTopicOptions {
4096            partition_count: topic_partition_count,
4097            replication_factor: topic_replication_factor,
4098            topic_config: topic_config.unwrap_or_default(),
4099        },
4100        topic_metadata_refresh_interval,
4101    }))
4102}
4103
4104pub fn describe_create_index(
4105    _: &StatementContext,
4106    _: CreateIndexStatement<Aug>,
4107) -> Result<StatementDesc, PlanError> {
4108    Ok(StatementDesc::new(None))
4109}
4110
4111pub fn plan_create_index(
4112    scx: &StatementContext,
4113    mut stmt: CreateIndexStatement<Aug>,
4114) -> Result<Plan, PlanError> {
4115    let CreateIndexStatement {
4116        name,
4117        on_name,
4118        in_cluster,
4119        key_parts,
4120        with_options,
4121        if_not_exists,
4122    } = &mut stmt;
4123    let on = scx.get_item_by_resolved_name(on_name)?;
4124    let on_item_type = on.item_type();
4125
4126    if !matches!(
4127        on_item_type,
4128        CatalogItemType::View
4129            | CatalogItemType::MaterializedView
4130            | CatalogItemType::Source
4131            | CatalogItemType::Table
4132    ) {
4133        sql_bail!(
4134            "index cannot be created on {} because it is a {}",
4135            on_name.full_name_str(),
4136            on.item_type()
4137        )
4138    }
4139
4140    let on_desc = on.relation_desc().expect("item type checked above");
4141
4142    let filled_key_parts = match key_parts {
4143        Some(kp) => kp.to_vec(),
4144        None => {
4145            // `key_parts` is None if we're creating a "default" index.
4146            let key = on_desc.typ().default_key();
4147            key.iter()
4148                .map(|i| match on_desc.get_unambiguous_name(*i) {
4149                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
4150                    _ => Expr::Value(Value::Number((i + 1).to_string())),
4151                })
4152                .collect()
4153        }
4154    };
4155    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4156
4157    let index_name = if let Some(name) = name {
4158        QualifiedItemName {
4159            qualifiers: on.name().qualifiers.clone(),
4160            item: normalize::ident(name.clone()),
4161        }
4162    } else {
4163        let mut idx_name = QualifiedItemName {
4164            qualifiers: on.name().qualifiers.clone(),
4165            item: on.name().item.clone(),
4166        };
4167        if key_parts.is_none() {
4168            // We're trying to create the "default" index.
4169            idx_name.item += "_primary_idx";
4170        } else {
4171            // Use PG schema for automatically naming indexes:
4172            // `<table>_<_-separated indexed expressions>_idx`
4173            let index_name_col_suffix = keys
4174                .iter()
4175                .map(|k| match k {
4176                    mz_expr::MirScalarExpr::Column(i, name) => {
4177                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4178                            (Some(col_name), _) => col_name.to_string(),
4179                            (None, Some(name)) => name.to_string(),
4180                            (None, None) => format!("{}", i + 1),
4181                        }
4182                    }
4183                    _ => "expr".to_string(),
4184                })
4185                .join("_");
4186            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4187                .expect("write on strings cannot fail");
4188            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4189        }
4190
4191        if !*if_not_exists {
4192            scx.catalog.find_available_name(idx_name)
4193        } else {
4194            idx_name
4195        }
4196    };
4197
4198    // Check for an object in the catalog with this same name
4199    let full_name = scx.catalog.resolve_full_name(&index_name);
4200    let partial_name = PartialItemName::from(full_name.clone());
4201    // For PostgreSQL compatibility, we need to prevent creating indexes when
4202    // there is an existing object *or* type of the same name.
4203    //
4204    // Technically, we only need to prevent coexistence of indexes and types
4205    // that have an associated relation (record types but not list/map types).
4206    // Enforcing that would be more complicated, though. It's backwards
4207    // compatible to weaken this restriction in the future.
4208    if let (Ok(item), false, false) = (
4209        scx.catalog.resolve_item_or_type(&partial_name),
4210        *if_not_exists,
4211        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4212    ) {
4213        return Err(PlanError::ItemAlreadyExists {
4214            name: full_name.to_string(),
4215            item_type: item.item_type(),
4216        });
4217    }
4218
4219    let options = plan_index_options(scx, with_options.clone())?;
4220    let cluster_id = match in_cluster {
4221        None => scx.resolve_cluster(None)?.id(),
4222        Some(in_cluster) => in_cluster.id,
4223    };
4224
4225    *in_cluster = Some(ResolvedClusterName {
4226        id: cluster_id,
4227        print_name: None,
4228    });
4229
4230    // Normalize `stmt`.
4231    *name = Some(Ident::new(index_name.item.clone())?);
4232    *key_parts = Some(filled_key_parts);
4233    let if_not_exists = *if_not_exists;
4234
4235    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4236    let compaction_window = options.iter().find_map(|o| {
4237        #[allow(irrefutable_let_patterns)]
4238        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4239            Some(lcw.clone())
4240        } else {
4241            None
4242        }
4243    });
4244
4245    Ok(Plan::CreateIndex(CreateIndexPlan {
4246        name: index_name,
4247        index: Index {
4248            create_sql,
4249            on: on.global_id(),
4250            keys,
4251            cluster_id,
4252            compaction_window,
4253        },
4254        if_not_exists,
4255    }))
4256}
4257
4258pub fn describe_create_type(
4259    _: &StatementContext,
4260    _: CreateTypeStatement<Aug>,
4261) -> Result<StatementDesc, PlanError> {
4262    Ok(StatementDesc::new(None))
4263}
4264
4265pub fn plan_create_type(
4266    scx: &StatementContext,
4267    stmt: CreateTypeStatement<Aug>,
4268) -> Result<Plan, PlanError> {
4269    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4270    let CreateTypeStatement { name, as_type, .. } = stmt;
4271
4272    fn validate_data_type(
4273        scx: &StatementContext,
4274        data_type: ResolvedDataType,
4275        as_type: &str,
4276        key: &str,
4277    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4278        let (id, modifiers) = match data_type {
4279            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4280            _ => sql_bail!(
4281                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4282                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4283                as_type,
4284                key,
4285                data_type.human_readable_name(),
4286            ),
4287        };
4288
4289        let item = scx.catalog.get_item(&id);
4290        match item.type_details() {
4291            None => sql_bail!(
4292                "{} must be of class type, but received {} which is of class {}",
4293                key,
4294                scx.catalog.resolve_full_name(item.name()),
4295                item.item_type()
4296            ),
4297            Some(CatalogTypeDetails {
4298                typ: CatalogType::Char,
4299                ..
4300            }) => {
4301                bail_unsupported!("embedding char type in a list or map")
4302            }
4303            _ => {
4304                // Validate that the modifiers are actually valid.
4305                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4306
4307                Ok((id, modifiers))
4308            }
4309        }
4310    }
4311
4312    let inner = match as_type {
4313        CreateTypeAs::List { options } => {
4314            let CreateTypeListOptionExtracted {
4315                element_type,
4316                seen: _,
4317            } = CreateTypeListOptionExtracted::try_from(options)?;
4318            let element_type =
4319                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4320            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4321            CatalogType::List {
4322                element_reference: id,
4323                element_modifiers: modifiers,
4324            }
4325        }
4326        CreateTypeAs::Map { options } => {
4327            let CreateTypeMapOptionExtracted {
4328                key_type,
4329                value_type,
4330                seen: _,
4331            } = CreateTypeMapOptionExtracted::try_from(options)?;
4332            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4333            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4334            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4335            let (value_id, value_modifiers) =
4336                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4337            CatalogType::Map {
4338                key_reference: key_id,
4339                key_modifiers,
4340                value_reference: value_id,
4341                value_modifiers,
4342            }
4343        }
4344        CreateTypeAs::Record { column_defs } => {
4345            let mut fields = vec![];
4346            for column_def in column_defs {
4347                let data_type = column_def.data_type;
4348                let key = ident(column_def.name.clone());
4349                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4350                fields.push(CatalogRecordField {
4351                    name: ColumnName::from(key.clone()),
4352                    type_reference: id,
4353                    type_modifiers: modifiers,
4354                });
4355            }
4356            CatalogType::Record { fields }
4357        }
4358    };
4359
4360    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4361
4362    // Check for an object in the catalog with this same name
4363    let full_name = scx.catalog.resolve_full_name(&name);
4364    let partial_name = PartialItemName::from(full_name.clone());
4365    // For PostgreSQL compatibility, we need to prevent creating types when
4366    // there is an existing object *or* type of the same name.
4367    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4368        if item.item_type().conflicts_with_type() {
4369            return Err(PlanError::ItemAlreadyExists {
4370                name: full_name.to_string(),
4371                item_type: item.item_type(),
4372            });
4373        }
4374    }
4375
4376    Ok(Plan::CreateType(CreateTypePlan {
4377        name,
4378        typ: Type { create_sql, inner },
4379    }))
4380}
4381
4382generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4383
4384generate_extracted_config!(
4385    CreateTypeMapOption,
4386    (KeyType, ResolvedDataType),
4387    (ValueType, ResolvedDataType)
4388);
4389
4390#[derive(Debug)]
4391pub enum PlannedAlterRoleOption {
4392    Attributes(PlannedRoleAttributes),
4393    Variable(PlannedRoleVariable),
4394}
4395
4396#[derive(Debug, Clone)]
4397pub struct PlannedRoleAttributes {
4398    pub inherit: Option<bool>,
4399    pub password: Option<Password>,
4400    pub scram_iterations: Option<NonZeroU32>,
4401    /// `nopassword` is set to true if the password is from the parser is None.
4402    /// This is semantically different than not supplying a password at all,
4403    /// to allow for unsetting a password.
4404    pub nopassword: Option<bool>,
4405    pub superuser: Option<bool>,
4406    pub login: Option<bool>,
4407}
4408
4409fn plan_role_attributes(
4410    options: Vec<RoleAttribute>,
4411    scx: &StatementContext,
4412) -> Result<PlannedRoleAttributes, PlanError> {
4413    let mut planned_attributes = PlannedRoleAttributes {
4414        inherit: None,
4415        password: None,
4416        scram_iterations: None,
4417        superuser: None,
4418        login: None,
4419        nopassword: None,
4420    };
4421
4422    for option in options {
4423        match option {
4424            RoleAttribute::Inherit | RoleAttribute::NoInherit
4425                if planned_attributes.inherit.is_some() =>
4426            {
4427                sql_bail!("conflicting or redundant options");
4428            }
4429            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4430                bail_never_supported!(
4431                    "CREATECLUSTER attribute",
4432                    "sql/create-role/#details",
4433                    "Use system privileges instead."
4434                );
4435            }
4436            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4437                bail_never_supported!(
4438                    "CREATEDB attribute",
4439                    "sql/create-role/#details",
4440                    "Use system privileges instead."
4441                );
4442            }
4443            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4444                bail_never_supported!(
4445                    "CREATEROLE attribute",
4446                    "sql/create-role/#details",
4447                    "Use system privileges instead."
4448                );
4449            }
4450            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4451                sql_bail!("conflicting or redundant options");
4452            }
4453
4454            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4455            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4456            RoleAttribute::Password(password) => {
4457                if let Some(password) = password {
4458                    planned_attributes.password = Some(password.into());
4459                    planned_attributes.scram_iterations =
4460                        Some(scx.catalog.system_vars().scram_iterations())
4461                } else {
4462                    planned_attributes.nopassword = Some(true);
4463                }
4464            }
4465            RoleAttribute::SuperUser => {
4466                if planned_attributes.superuser == Some(false) {
4467                    sql_bail!("conflicting or redundant options");
4468                }
4469                planned_attributes.superuser = Some(true);
4470            }
4471            RoleAttribute::NoSuperUser => {
4472                if planned_attributes.superuser == Some(true) {
4473                    sql_bail!("conflicting or redundant options");
4474                }
4475                planned_attributes.superuser = Some(false);
4476            }
4477            RoleAttribute::Login => {
4478                if planned_attributes.login == Some(false) {
4479                    sql_bail!("conflicting or redundant options");
4480                }
4481                planned_attributes.login = Some(true);
4482            }
4483            RoleAttribute::NoLogin => {
4484                if planned_attributes.login == Some(true) {
4485                    sql_bail!("conflicting or redundant options");
4486                }
4487                planned_attributes.login = Some(false);
4488            }
4489        }
4490    }
4491    if planned_attributes.inherit == Some(false) {
4492        bail_unsupported!("non inherit roles");
4493    }
4494
4495    Ok(planned_attributes)
4496}
4497
4498#[derive(Debug)]
4499pub enum PlannedRoleVariable {
4500    Set { name: String, value: VariableValue },
4501    Reset { name: String },
4502}
4503
4504impl PlannedRoleVariable {
4505    pub fn name(&self) -> &str {
4506        match self {
4507            PlannedRoleVariable::Set { name, .. } => name,
4508            PlannedRoleVariable::Reset { name } => name,
4509        }
4510    }
4511}
4512
4513fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4514    let plan = match variable {
4515        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4516            name: name.to_string(),
4517            value: scl::plan_set_variable_to(value)?,
4518        },
4519        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4520            name: name.to_string(),
4521        },
4522    };
4523    Ok(plan)
4524}
4525
4526pub fn describe_create_role(
4527    _: &StatementContext,
4528    _: CreateRoleStatement,
4529) -> Result<StatementDesc, PlanError> {
4530    Ok(StatementDesc::new(None))
4531}
4532
4533pub fn plan_create_role(
4534    scx: &StatementContext,
4535    CreateRoleStatement { name, options }: CreateRoleStatement,
4536) -> Result<Plan, PlanError> {
4537    let attributes = plan_role_attributes(options, scx)?;
4538    Ok(Plan::CreateRole(CreateRolePlan {
4539        name: normalize::ident(name),
4540        attributes: attributes.into(),
4541    }))
4542}
4543
4544pub fn plan_create_network_policy(
4545    ctx: &StatementContext,
4546    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4547) -> Result<Plan, PlanError> {
4548    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4549    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4550
4551    let Some(rule_defs) = policy_options.rules else {
4552        sql_bail!("RULES must be specified when creating network policies.");
4553    };
4554
4555    let mut rules = vec![];
4556    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4557        let NetworkPolicyRuleOptionExtracted {
4558            seen: _,
4559            direction,
4560            action,
4561            address,
4562        } = options.try_into()?;
4563        let (direction, action, address) = match (direction, action, address) {
4564            (Some(direction), Some(action), Some(address)) => (
4565                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4566                NetworkPolicyRuleAction::try_from(action.as_str())?,
4567                PolicyAddress::try_from(address.as_str())?,
4568            ),
4569            (_, _, _) => {
4570                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4571            }
4572        };
4573        rules.push(NetworkPolicyRule {
4574            name: normalize::ident(name),
4575            direction,
4576            action,
4577            address,
4578        });
4579    }
4580
4581    if rules.len()
4582        > ctx
4583            .catalog
4584            .system_vars()
4585            .max_rules_per_network_policy()
4586            .try_into()?
4587    {
4588        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4589    }
4590
4591    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4592        name: normalize::ident(name),
4593        rules,
4594    }))
4595}
4596
4597pub fn plan_alter_network_policy(
4598    ctx: &StatementContext,
4599    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4600) -> Result<Plan, PlanError> {
4601    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4602
4603    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4604    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4605
4606    let Some(rule_defs) = policy_options.rules else {
4607        sql_bail!("RULES must be specified when creating network policies.");
4608    };
4609
4610    let mut rules = vec![];
4611    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4612        let NetworkPolicyRuleOptionExtracted {
4613            seen: _,
4614            direction,
4615            action,
4616            address,
4617        } = options.try_into()?;
4618
4619        let (direction, action, address) = match (direction, action, address) {
4620            (Some(direction), Some(action), Some(address)) => (
4621                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4622                NetworkPolicyRuleAction::try_from(action.as_str())?,
4623                PolicyAddress::try_from(address.as_str())?,
4624            ),
4625            (_, _, _) => {
4626                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4627            }
4628        };
4629        rules.push(NetworkPolicyRule {
4630            name: normalize::ident(name),
4631            direction,
4632            action,
4633            address,
4634        });
4635    }
4636    if rules.len()
4637        > ctx
4638            .catalog
4639            .system_vars()
4640            .max_rules_per_network_policy()
4641            .try_into()?
4642    {
4643        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4644    }
4645
4646    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4647        id: policy.id(),
4648        name: normalize::ident(name),
4649        rules,
4650    }))
4651}
4652
4653pub fn describe_create_cluster(
4654    _: &StatementContext,
4655    _: CreateClusterStatement<Aug>,
4656) -> Result<StatementDesc, PlanError> {
4657    Ok(StatementDesc::new(None))
4658}
4659
4660// WARNING:
4661// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4662// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4663// that option stays the same. If you were to give a default value here, then not giving that option
4664// to ALTER CLUSTER would always reset the value of that option to the default.
4665generate_extracted_config!(
4666    ClusterOption,
4667    (AvailabilityZones, Vec<String>),
4668    (Disk, bool),
4669    (IntrospectionDebugging, bool),
4670    (IntrospectionInterval, OptionalDuration),
4671    (Managed, bool),
4672    (Replicas, Vec<ReplicaDefinition<Aug>>),
4673    (ReplicationFactor, u32),
4674    (Size, String),
4675    (Schedule, ClusterScheduleOptionValue),
4676    (WorkloadClass, OptionalString)
4677);
4678
4679generate_extracted_config!(
4680    NetworkPolicyOption,
4681    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4682);
4683
4684generate_extracted_config!(
4685    NetworkPolicyRuleOption,
4686    (Direction, String),
4687    (Action, String),
4688    (Address, String)
4689);
4690
4691generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4692
4693generate_extracted_config!(
4694    ClusterAlterUntilReadyOption,
4695    (Timeout, Duration),
4696    (OnTimeout, String)
4697);
4698
4699generate_extracted_config!(
4700    ClusterFeature,
4701    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4702    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4703    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4704    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4705    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4706    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4707    (
4708        EnableProjectionPushdownAfterRelationCse,
4709        Option<bool>,
4710        Default(None)
4711    )
4712);
4713
4714/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4715///
4716/// The reverse of [`unplan_create_cluster`].
4717pub fn plan_create_cluster(
4718    scx: &StatementContext,
4719    stmt: CreateClusterStatement<Aug>,
4720) -> Result<Plan, PlanError> {
4721    let plan = plan_create_cluster_inner(scx, stmt)?;
4722
4723    // Roundtrip through unplan and make sure that we end up with the same plan.
4724    if let CreateClusterVariant::Managed(_) = &plan.variant {
4725        let stmt = unplan_create_cluster(scx, plan.clone())
4726            .map_err(|e| PlanError::Replan(e.to_string()))?;
4727        let create_sql = stmt.to_ast_string_stable();
4728        let stmt = parse::parse(&create_sql)
4729            .map_err(|e| PlanError::Replan(e.to_string()))?
4730            .into_element()
4731            .ast;
4732        let (stmt, _resolved_ids) =
4733            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4734        let stmt = match stmt {
4735            Statement::CreateCluster(stmt) => stmt,
4736            stmt => {
4737                return Err(PlanError::Replan(format!(
4738                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4739                )));
4740            }
4741        };
4742        let replan =
4743            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4744        if plan != replan {
4745            return Err(PlanError::Replan(format!(
4746                "replan does not match: plan={plan:?}, replan={replan:?}"
4747            )));
4748        }
4749    }
4750
4751    Ok(Plan::CreateCluster(plan))
4752}
4753
4754pub fn plan_create_cluster_inner(
4755    scx: &StatementContext,
4756    CreateClusterStatement {
4757        name,
4758        options,
4759        features,
4760    }: CreateClusterStatement<Aug>,
4761) -> Result<CreateClusterPlan, PlanError> {
4762    let ClusterOptionExtracted {
4763        availability_zones,
4764        introspection_debugging,
4765        introspection_interval,
4766        managed,
4767        replicas,
4768        replication_factor,
4769        seen: _,
4770        size,
4771        disk,
4772        schedule,
4773        workload_class,
4774    }: ClusterOptionExtracted = options.try_into()?;
4775
4776    let managed = managed.unwrap_or_else(|| replicas.is_none());
4777
4778    if !scx.catalog.active_role_id().is_system() {
4779        if !features.is_empty() {
4780            sql_bail!("FEATURES not supported for non-system users");
4781        }
4782        if workload_class.is_some() {
4783            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4784        }
4785    }
4786
4787    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4788    let workload_class = workload_class.and_then(|v| v.0);
4789
4790    if managed {
4791        if replicas.is_some() {
4792            sql_bail!("REPLICAS not supported for managed clusters");
4793        }
4794        let Some(size) = size else {
4795            sql_bail!("SIZE must be specified for managed clusters");
4796        };
4797
4798        if disk.is_some() {
4799            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4800            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4801            // we'll be able to remove the `DISK` option entirely.
4802            if scx.catalog.is_cluster_size_cc(&size) {
4803                sql_bail!(
4804                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4805                );
4806            }
4807
4808            scx.catalog
4809                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4810        }
4811
4812        let compute = plan_compute_replica_config(
4813            introspection_interval,
4814            introspection_debugging.unwrap_or(false),
4815        )?;
4816
4817        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4818            replication_factor.unwrap_or_else(|| {
4819                scx.catalog
4820                    .system_vars()
4821                    .default_cluster_replication_factor()
4822            })
4823        } else {
4824            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4825            if replication_factor.is_some() {
4826                sql_bail!(
4827                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4828                );
4829            }
4830            // If we have a non-trivial schedule, then let's not have any replicas initially,
4831            // to avoid quickly going back and forth if the schedule doesn't want a replica
4832            // initially.
4833            0
4834        };
4835        let availability_zones = availability_zones.unwrap_or_default();
4836
4837        if !availability_zones.is_empty() {
4838            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4839        }
4840
4841        // Plan OptimizerFeatureOverrides.
4842        let ClusterFeatureExtracted {
4843            reoptimize_imported_views,
4844            enable_eager_delta_joins,
4845            enable_new_outer_join_lowering,
4846            enable_variadic_left_join_lowering,
4847            enable_letrec_fixpoint_analysis,
4848            enable_join_prioritize_arranged,
4849            enable_projection_pushdown_after_relation_cse,
4850            seen: _,
4851        } = ClusterFeatureExtracted::try_from(features)?;
4852        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4853            reoptimize_imported_views,
4854            enable_eager_delta_joins,
4855            enable_new_outer_join_lowering,
4856            enable_variadic_left_join_lowering,
4857            enable_letrec_fixpoint_analysis,
4858            enable_join_prioritize_arranged,
4859            enable_projection_pushdown_after_relation_cse,
4860            ..Default::default()
4861        };
4862
4863        let schedule = plan_cluster_schedule(schedule)?;
4864
4865        Ok(CreateClusterPlan {
4866            name: normalize::ident(name),
4867            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4868                replication_factor,
4869                size,
4870                availability_zones,
4871                compute,
4872                optimizer_feature_overrides,
4873                schedule,
4874            }),
4875            workload_class,
4876        })
4877    } else {
4878        let Some(replica_defs) = replicas else {
4879            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4880        };
4881        if availability_zones.is_some() {
4882            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4883        }
4884        if replication_factor.is_some() {
4885            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4886        }
4887        if introspection_debugging.is_some() {
4888            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4889        }
4890        if introspection_interval.is_some() {
4891            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4892        }
4893        if size.is_some() {
4894            sql_bail!("SIZE not supported for unmanaged clusters");
4895        }
4896        if disk.is_some() {
4897            sql_bail!("DISK not supported for unmanaged clusters");
4898        }
4899        if !features.is_empty() {
4900            sql_bail!("FEATURES not supported for unmanaged clusters");
4901        }
4902        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4903            sql_bail!(
4904                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4905            );
4906        }
4907
4908        let mut replicas = vec![];
4909        for ReplicaDefinition { name, options } in replica_defs {
4910            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4911        }
4912
4913        Ok(CreateClusterPlan {
4914            name: normalize::ident(name),
4915            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4916            workload_class,
4917        })
4918    }
4919}
4920
4921/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4922///
4923/// The reverse of [`plan_create_cluster`].
4924pub fn unplan_create_cluster(
4925    scx: &StatementContext,
4926    CreateClusterPlan {
4927        name,
4928        variant,
4929        workload_class,
4930    }: CreateClusterPlan,
4931) -> Result<CreateClusterStatement<Aug>, PlanError> {
4932    match variant {
4933        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4934            replication_factor,
4935            size,
4936            availability_zones,
4937            compute,
4938            optimizer_feature_overrides,
4939            schedule,
4940        }) => {
4941            let schedule = unplan_cluster_schedule(schedule);
4942            let OptimizerFeatureOverrides {
4943                enable_guard_subquery_tablefunc: _,
4944                enable_consolidate_after_union_negate: _,
4945                enable_reduce_mfp_fusion: _,
4946                enable_cardinality_estimates: _,
4947                persist_fast_path_limit: _,
4948                reoptimize_imported_views,
4949                enable_eager_delta_joins,
4950                enable_new_outer_join_lowering,
4951                enable_variadic_left_join_lowering,
4952                enable_letrec_fixpoint_analysis,
4953                enable_reduce_reduction: _,
4954                enable_join_prioritize_arranged,
4955                enable_projection_pushdown_after_relation_cse,
4956                enable_less_reduce_in_eqprop: _,
4957                enable_dequadratic_eqprop_map: _,
4958                enable_eq_classes_withholding_errors: _,
4959                enable_fast_path_plan_insights: _,
4960            } = optimizer_feature_overrides;
4961            // The ones from above that don't occur below are not wired up to cluster features.
4962            let features_extracted = ClusterFeatureExtracted {
4963                // Seen is ignored when unplanning.
4964                seen: Default::default(),
4965                reoptimize_imported_views,
4966                enable_eager_delta_joins,
4967                enable_new_outer_join_lowering,
4968                enable_variadic_left_join_lowering,
4969                enable_letrec_fixpoint_analysis,
4970                enable_join_prioritize_arranged,
4971                enable_projection_pushdown_after_relation_cse,
4972            };
4973            let features = features_extracted.into_values(scx.catalog);
4974            let availability_zones = if availability_zones.is_empty() {
4975                None
4976            } else {
4977                Some(availability_zones)
4978            };
4979            let (introspection_interval, introspection_debugging) =
4980                unplan_compute_replica_config(compute);
4981            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4982            // always 1 or less.
4983            let replication_factor = match &schedule {
4984                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4985                ClusterScheduleOptionValue::Refresh { .. } => {
4986                    assert!(
4987                        replication_factor <= 1,
4988                        "replication factor, {replication_factor:?}, must be <= 1"
4989                    );
4990                    None
4991                }
4992            };
4993            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4994            let options_extracted = ClusterOptionExtracted {
4995                // Seen is ignored when unplanning.
4996                seen: Default::default(),
4997                availability_zones,
4998                disk: None,
4999                introspection_debugging: Some(introspection_debugging),
5000                introspection_interval,
5001                managed: Some(true),
5002                replicas: None,
5003                replication_factor,
5004                size: Some(size),
5005                schedule: Some(schedule),
5006                workload_class,
5007            };
5008            let options = options_extracted.into_values(scx.catalog);
5009            let name = Ident::new_unchecked(name);
5010            Ok(CreateClusterStatement {
5011                name,
5012                options,
5013                features,
5014            })
5015        }
5016        CreateClusterVariant::Unmanaged(_) => {
5017            bail_unsupported!("SHOW CREATE for unmanaged clusters")
5018        }
5019    }
5020}
5021
5022generate_extracted_config!(
5023    ReplicaOption,
5024    (AvailabilityZone, String),
5025    (BilledAs, String),
5026    (ComputeAddresses, Vec<String>),
5027    (ComputectlAddresses, Vec<String>),
5028    (Disk, bool),
5029    (Internal, bool, Default(false)),
5030    (IntrospectionDebugging, bool, Default(false)),
5031    (IntrospectionInterval, OptionalDuration),
5032    (Size, String),
5033    (StorageAddresses, Vec<String>),
5034    (StoragectlAddresses, Vec<String>),
5035    (Workers, u16)
5036);
5037
5038fn plan_replica_config(
5039    scx: &StatementContext,
5040    options: Vec<ReplicaOption<Aug>>,
5041) -> Result<ReplicaConfig, PlanError> {
5042    let ReplicaOptionExtracted {
5043        availability_zone,
5044        billed_as,
5045        computectl_addresses,
5046        disk,
5047        internal,
5048        introspection_debugging,
5049        introspection_interval,
5050        size,
5051        storagectl_addresses,
5052        ..
5053    }: ReplicaOptionExtracted = options.try_into()?;
5054
5055    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5056
5057    match (
5058        size,
5059        availability_zone,
5060        billed_as,
5061        storagectl_addresses,
5062        computectl_addresses,
5063    ) {
5064        // Common cases we expect end users to hit.
5065        (None, _, None, None, None) => {
5066            // We don't mention the unmanaged options in the error message
5067            // because they are only available in unsafe mode.
5068            sql_bail!("SIZE option must be specified");
5069        }
5070        (Some(size), availability_zone, billed_as, None, None) => {
5071            if disk.is_some() {
5072                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5073                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5074                // we'll be able to remove the `DISK` option entirely.
5075                if scx.catalog.is_cluster_size_cc(&size) {
5076                    sql_bail!(
5077                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5078                    );
5079                }
5080
5081                scx.catalog
5082                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5083            }
5084
5085            Ok(ReplicaConfig::Orchestrated {
5086                size,
5087                availability_zone,
5088                compute,
5089                billed_as,
5090                internal,
5091            })
5092        }
5093
5094        (None, None, None, storagectl_addresses, computectl_addresses) => {
5095            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5096
5097            // When manually testing Materialize in unsafe mode, it's easy to
5098            // accidentally omit one of these options, so we try to produce
5099            // helpful error messages.
5100            let Some(storagectl_addrs) = storagectl_addresses else {
5101                sql_bail!("missing STORAGECTL ADDRESSES option");
5102            };
5103            let Some(computectl_addrs) = computectl_addresses else {
5104                sql_bail!("missing COMPUTECTL ADDRESSES option");
5105            };
5106
5107            if storagectl_addrs.len() != computectl_addrs.len() {
5108                sql_bail!(
5109                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5110                );
5111            }
5112
5113            if disk.is_some() {
5114                sql_bail!("DISK can't be specified for unorchestrated clusters");
5115            }
5116
5117            Ok(ReplicaConfig::Unorchestrated {
5118                storagectl_addrs,
5119                computectl_addrs,
5120                compute,
5121            })
5122        }
5123        _ => {
5124            // We don't bother trying to produce a more helpful error message
5125            // here because no user is likely to hit this path.
5126            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5127        }
5128    }
5129}
5130
5131/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5132///
5133/// The reverse of [`unplan_compute_replica_config`].
5134fn plan_compute_replica_config(
5135    introspection_interval: Option<OptionalDuration>,
5136    introspection_debugging: bool,
5137) -> Result<ComputeReplicaConfig, PlanError> {
5138    let introspection_interval = introspection_interval
5139        .map(|OptionalDuration(i)| i)
5140        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5141    let introspection = match introspection_interval {
5142        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5143            interval,
5144            debugging: introspection_debugging,
5145        }),
5146        None if introspection_debugging => {
5147            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5148        }
5149        None => None,
5150    };
5151    let compute = ComputeReplicaConfig { introspection };
5152    Ok(compute)
5153}
5154
5155/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5156///
5157/// The reverse of [`plan_compute_replica_config`].
5158fn unplan_compute_replica_config(
5159    compute_replica_config: ComputeReplicaConfig,
5160) -> (Option<OptionalDuration>, bool) {
5161    match compute_replica_config.introspection {
5162        Some(ComputeReplicaIntrospectionConfig {
5163            debugging,
5164            interval,
5165        }) => (Some(OptionalDuration(Some(interval))), debugging),
5166        None => (Some(OptionalDuration(None)), false),
5167    }
5168}
5169
5170/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5171///
5172/// The reverse of [`unplan_cluster_schedule`].
5173fn plan_cluster_schedule(
5174    schedule: ClusterScheduleOptionValue,
5175) -> Result<ClusterSchedule, PlanError> {
5176    Ok(match schedule {
5177        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5178        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5179        ClusterScheduleOptionValue::Refresh {
5180            hydration_time_estimate: None,
5181        } => ClusterSchedule::Refresh {
5182            hydration_time_estimate: Duration::from_millis(0),
5183        },
5184        // Otherwise we convert the `IntervalValue` to a `Duration`.
5185        ClusterScheduleOptionValue::Refresh {
5186            hydration_time_estimate: Some(interval_value),
5187        } => {
5188            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5189            if interval.as_microseconds() < 0 {
5190                sql_bail!(
5191                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5192                    interval
5193                );
5194            }
5195            if interval.months != 0 {
5196                // This limitation is because we want this interval to be cleanly convertable
5197                // to a unix epoch timestamp difference. When the interval involves months, then
5198                // this is not true anymore, because months have variable lengths.
5199                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5200            }
5201            let duration = interval.duration()?;
5202            if u64::try_from(duration.as_millis()).is_err()
5203                || Interval::from_duration(&duration).is_err()
5204            {
5205                sql_bail!("HYDRATION TIME ESTIMATE too large");
5206            }
5207            ClusterSchedule::Refresh {
5208                hydration_time_estimate: duration,
5209            }
5210        }
5211    })
5212}
5213
5214/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5215///
5216/// The reverse of [`plan_cluster_schedule`].
5217fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5218    match schedule {
5219        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5220        ClusterSchedule::Refresh {
5221            hydration_time_estimate,
5222        } => {
5223            let interval = Interval::from_duration(&hydration_time_estimate)
5224                .expect("planning ensured that this is convertible back to Interval");
5225            let interval_value = literal::unplan_interval(&interval);
5226            ClusterScheduleOptionValue::Refresh {
5227                hydration_time_estimate: Some(interval_value),
5228            }
5229        }
5230    }
5231}
5232
5233pub fn describe_create_cluster_replica(
5234    _: &StatementContext,
5235    _: CreateClusterReplicaStatement<Aug>,
5236) -> Result<StatementDesc, PlanError> {
5237    Ok(StatementDesc::new(None))
5238}
5239
5240pub fn plan_create_cluster_replica(
5241    scx: &StatementContext,
5242    CreateClusterReplicaStatement {
5243        definition: ReplicaDefinition { name, options },
5244        of_cluster,
5245    }: CreateClusterReplicaStatement<Aug>,
5246) -> Result<Plan, PlanError> {
5247    let cluster = scx
5248        .catalog
5249        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5250    let current_replica_count = cluster.replica_ids().iter().count();
5251    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5252        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5253        return Err(PlanError::CreateReplicaFailStorageObjects {
5254            current_replica_count,
5255            internal_replica_count,
5256            hypothetical_replica_count: current_replica_count + 1,
5257        });
5258    }
5259
5260    let config = plan_replica_config(scx, options)?;
5261
5262    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5263        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5264            return Err(PlanError::MangedReplicaName(name.into_string()));
5265        }
5266    } else {
5267        ensure_cluster_is_not_managed(scx, cluster.id())?;
5268    }
5269
5270    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5271        name: normalize::ident(name),
5272        cluster_id: cluster.id(),
5273        config,
5274    }))
5275}
5276
5277pub fn describe_create_secret(
5278    _: &StatementContext,
5279    _: CreateSecretStatement<Aug>,
5280) -> Result<StatementDesc, PlanError> {
5281    Ok(StatementDesc::new(None))
5282}
5283
5284pub fn plan_create_secret(
5285    scx: &StatementContext,
5286    stmt: CreateSecretStatement<Aug>,
5287) -> Result<Plan, PlanError> {
5288    let CreateSecretStatement {
5289        name,
5290        if_not_exists,
5291        value,
5292    } = &stmt;
5293
5294    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5295    let mut create_sql_statement = stmt.clone();
5296    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5297    let create_sql =
5298        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5299    let secret_as = query::plan_secret_as(scx, value.clone())?;
5300
5301    let secret = Secret {
5302        create_sql,
5303        secret_as,
5304    };
5305
5306    Ok(Plan::CreateSecret(CreateSecretPlan {
5307        name,
5308        secret,
5309        if_not_exists: *if_not_exists,
5310    }))
5311}
5312
5313pub fn describe_create_connection(
5314    _: &StatementContext,
5315    _: CreateConnectionStatement<Aug>,
5316) -> Result<StatementDesc, PlanError> {
5317    Ok(StatementDesc::new(None))
5318}
5319
5320generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5321
5322pub fn plan_create_connection(
5323    scx: &StatementContext,
5324    mut stmt: CreateConnectionStatement<Aug>,
5325) -> Result<Plan, PlanError> {
5326    let CreateConnectionStatement {
5327        name,
5328        connection_type,
5329        values,
5330        if_not_exists,
5331        with_options,
5332    } = stmt.clone();
5333    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5334    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5335    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5336
5337    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5338    if options.validate.is_some() {
5339        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5340    }
5341    let validate = match options.validate {
5342        Some(val) => val,
5343        None => {
5344            scx.catalog
5345                .system_vars()
5346                .enable_default_connection_validation()
5347                && details.to_connection().validate_by_default()
5348        }
5349    };
5350
5351    // Check for an object in the catalog with this same name
5352    let full_name = scx.catalog.resolve_full_name(&name);
5353    let partial_name = PartialItemName::from(full_name.clone());
5354    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5355        return Err(PlanError::ItemAlreadyExists {
5356            name: full_name.to_string(),
5357            item_type: item.item_type(),
5358        });
5359    }
5360
5361    // For SSH connections, overwrite the public key options based on the
5362    // connection details, in case we generated new keys during planning.
5363    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5364        stmt.values.retain(|v| {
5365            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5366        });
5367        stmt.values.push(ConnectionOption {
5368            name: ConnectionOptionName::PublicKey1,
5369            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5370        });
5371        stmt.values.push(ConnectionOption {
5372            name: ConnectionOptionName::PublicKey2,
5373            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5374        });
5375    }
5376    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5377
5378    let plan = CreateConnectionPlan {
5379        name,
5380        if_not_exists,
5381        connection: crate::plan::Connection {
5382            create_sql,
5383            details,
5384        },
5385        validate,
5386    };
5387    Ok(Plan::CreateConnection(plan))
5388}
5389
5390fn plan_drop_database(
5391    scx: &StatementContext,
5392    if_exists: bool,
5393    name: &UnresolvedDatabaseName,
5394    cascade: bool,
5395) -> Result<Option<DatabaseId>, PlanError> {
5396    Ok(match resolve_database(scx, name, if_exists)? {
5397        Some(database) => {
5398            if !cascade && database.has_schemas() {
5399                sql_bail!(
5400                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5401                    name,
5402                );
5403            }
5404            Some(database.id())
5405        }
5406        None => None,
5407    })
5408}
5409
5410pub fn describe_drop_objects(
5411    _: &StatementContext,
5412    _: DropObjectsStatement,
5413) -> Result<StatementDesc, PlanError> {
5414    Ok(StatementDesc::new(None))
5415}
5416
5417pub fn plan_drop_objects(
5418    scx: &mut StatementContext,
5419    DropObjectsStatement {
5420        object_type,
5421        if_exists,
5422        names,
5423        cascade,
5424    }: DropObjectsStatement,
5425) -> Result<Plan, PlanError> {
5426    assert_ne!(
5427        object_type,
5428        mz_sql_parser::ast::ObjectType::Func,
5429        "rejected in parser"
5430    );
5431    let object_type = object_type.into();
5432
5433    let mut referenced_ids = Vec::new();
5434    for name in names {
5435        let id = match &name {
5436            UnresolvedObjectName::Cluster(name) => {
5437                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5438            }
5439            UnresolvedObjectName::ClusterReplica(name) => {
5440                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5441            }
5442            UnresolvedObjectName::Database(name) => {
5443                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5444            }
5445            UnresolvedObjectName::Schema(name) => {
5446                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5447            }
5448            UnresolvedObjectName::Role(name) => {
5449                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5450            }
5451            UnresolvedObjectName::Item(name) => {
5452                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5453                    .map(ObjectId::Item)
5454            }
5455            UnresolvedObjectName::NetworkPolicy(name) => {
5456                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5457            }
5458        };
5459        match id {
5460            Some(id) => referenced_ids.push(id),
5461            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5462                name: name.to_ast_string_simple(),
5463                object_type,
5464            }),
5465        }
5466    }
5467    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5468
5469    Ok(Plan::DropObjects(DropObjectsPlan {
5470        referenced_ids,
5471        drop_ids,
5472        object_type,
5473    }))
5474}
5475
5476fn plan_drop_schema(
5477    scx: &StatementContext,
5478    if_exists: bool,
5479    name: &UnresolvedSchemaName,
5480    cascade: bool,
5481) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5482    // Special case for mz_temp: with lazy temporary schema creation, the temp
5483    // schema may not exist yet, but we still need to return the correct error.
5484    // Check the schema name directly against MZ_TEMP_SCHEMA.
5485    let normalized = normalize::unresolved_schema_name(name.clone())?;
5486    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5487        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5488    }
5489
5490    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5491        Some((database_spec, schema_spec)) => {
5492            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5493                sql_bail!(
5494                    "cannot drop schema {name} because it is required by the database system",
5495                );
5496            }
5497            if let SchemaSpecifier::Temporary = schema_spec {
5498                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5499            }
5500            let schema = scx.get_schema(&database_spec, &schema_spec);
5501            if !cascade && schema.has_items() {
5502                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5503                sql_bail!(
5504                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5505                    full_schema_name
5506                );
5507            }
5508            Some((database_spec, schema_spec))
5509        }
5510        None => None,
5511    })
5512}
5513
5514fn plan_drop_role(
5515    scx: &StatementContext,
5516    if_exists: bool,
5517    name: &Ident,
5518) -> Result<Option<RoleId>, PlanError> {
5519    match scx.catalog.resolve_role(name.as_str()) {
5520        Ok(role) => {
5521            let id = role.id();
5522            if &id == scx.catalog.active_role_id() {
5523                sql_bail!("current role cannot be dropped");
5524            }
5525            for role in scx.catalog.get_roles() {
5526                for (member_id, grantor_id) in role.membership() {
5527                    if &id == grantor_id {
5528                        let member_role = scx.catalog.get_role(member_id);
5529                        sql_bail!(
5530                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5531                            name.as_str(),
5532                            role.name(),
5533                            member_role.name()
5534                        );
5535                    }
5536                }
5537            }
5538            Ok(Some(role.id()))
5539        }
5540        Err(_) if if_exists => Ok(None),
5541        Err(e) => Err(e.into()),
5542    }
5543}
5544
5545fn plan_drop_cluster(
5546    scx: &StatementContext,
5547    if_exists: bool,
5548    name: &Ident,
5549    cascade: bool,
5550) -> Result<Option<ClusterId>, PlanError> {
5551    Ok(match resolve_cluster(scx, name, if_exists)? {
5552        Some(cluster) => {
5553            if !cascade && !cluster.bound_objects().is_empty() {
5554                return Err(PlanError::DependentObjectsStillExist {
5555                    object_type: "cluster".to_string(),
5556                    object_name: cluster.name().to_string(),
5557                    dependents: Vec::new(),
5558                });
5559            }
5560            Some(cluster.id())
5561        }
5562        None => None,
5563    })
5564}
5565
5566fn plan_drop_network_policy(
5567    scx: &StatementContext,
5568    if_exists: bool,
5569    name: &Ident,
5570) -> Result<Option<NetworkPolicyId>, PlanError> {
5571    match scx.catalog.resolve_network_policy(name.as_str()) {
5572        Ok(policy) => {
5573            // TODO(network_policy): When we support role based network policies, check if any role
5574            // currently has the specified policy set.
5575            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5576                Err(PlanError::NetworkPolicyInUse)
5577            } else {
5578                Ok(Some(policy.id()))
5579            }
5580        }
5581        Err(_) if if_exists => Ok(None),
5582        Err(e) => Err(e.into()),
5583    }
5584}
5585
5586/// Returns `true` if the cluster has any object that requires a single replica.
5587/// Returns `false` if the cluster has no objects.
5588fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5589    // If this feature is enabled then all objects support multiple-replicas
5590    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5591        false
5592    } else {
5593        // Othewise we check for the existence of sources or sinks
5594        cluster.bound_objects().iter().any(|id| {
5595            let item = scx.catalog.get_item(id);
5596            matches!(
5597                item.item_type(),
5598                CatalogItemType::Sink | CatalogItemType::Source
5599            )
5600        })
5601    }
5602}
5603
5604fn plan_drop_cluster_replica(
5605    scx: &StatementContext,
5606    if_exists: bool,
5607    name: &QualifiedReplica,
5608) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5609    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5610    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5611}
5612
5613/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5614fn plan_drop_item(
5615    scx: &StatementContext,
5616    object_type: ObjectType,
5617    if_exists: bool,
5618    name: UnresolvedItemName,
5619    cascade: bool,
5620) -> Result<Option<CatalogItemId>, PlanError> {
5621    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5622        Ok(r) => r,
5623        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5624        Err(PlanError::MismatchedObjectType {
5625            name,
5626            is_type: ObjectType::MaterializedView,
5627            expected_type: ObjectType::View,
5628        }) => {
5629            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5630        }
5631        e => e?,
5632    };
5633
5634    Ok(match resolved {
5635        Some(catalog_item) => {
5636            if catalog_item.id().is_system() {
5637                sql_bail!(
5638                    "cannot drop {} {} because it is required by the database system",
5639                    catalog_item.item_type(),
5640                    scx.catalog.minimal_qualification(catalog_item.name()),
5641                );
5642            }
5643
5644            if !cascade {
5645                for id in catalog_item.used_by() {
5646                    let dep = scx.catalog.get_item(id);
5647                    if dependency_prevents_drop(object_type, dep) {
5648                        return Err(PlanError::DependentObjectsStillExist {
5649                            object_type: catalog_item.item_type().to_string(),
5650                            object_name: scx
5651                                .catalog
5652                                .minimal_qualification(catalog_item.name())
5653                                .to_string(),
5654                            dependents: vec![(
5655                                dep.item_type().to_string(),
5656                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5657                            )],
5658                        });
5659                    }
5660                }
5661                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5662                //  relies on entry. Unfortunately, we don't have that information readily available.
5663            }
5664            Some(catalog_item.id())
5665        }
5666        None => None,
5667    })
5668}
5669
5670/// Does the dependency `dep` prevent a drop of a non-cascade query?
5671fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5672    match object_type {
5673        ObjectType::Type => true,
5674        _ => match dep.item_type() {
5675            CatalogItemType::Func
5676            | CatalogItemType::Table
5677            | CatalogItemType::Source
5678            | CatalogItemType::View
5679            | CatalogItemType::MaterializedView
5680            | CatalogItemType::Sink
5681            | CatalogItemType::Type
5682            | CatalogItemType::Secret
5683            | CatalogItemType::Connection
5684            | CatalogItemType::ContinualTask => true,
5685            CatalogItemType::Index => false,
5686        },
5687    }
5688}
5689
5690pub fn describe_alter_index_options(
5691    _: &StatementContext,
5692    _: AlterIndexStatement<Aug>,
5693) -> Result<StatementDesc, PlanError> {
5694    Ok(StatementDesc::new(None))
5695}
5696
5697pub fn describe_drop_owned(
5698    _: &StatementContext,
5699    _: DropOwnedStatement<Aug>,
5700) -> Result<StatementDesc, PlanError> {
5701    Ok(StatementDesc::new(None))
5702}
5703
5704pub fn plan_drop_owned(
5705    scx: &StatementContext,
5706    drop: DropOwnedStatement<Aug>,
5707) -> Result<Plan, PlanError> {
5708    let cascade = drop.cascade();
5709    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5710    let mut drop_ids = Vec::new();
5711    let mut privilege_revokes = Vec::new();
5712    let mut default_privilege_revokes = Vec::new();
5713
5714    fn update_privilege_revokes(
5715        object_id: SystemObjectId,
5716        privileges: &PrivilegeMap,
5717        role_ids: &BTreeSet<RoleId>,
5718        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5719    ) {
5720        privilege_revokes.extend(iter::zip(
5721            iter::repeat(object_id),
5722            privileges
5723                .all_values()
5724                .filter(|privilege| role_ids.contains(&privilege.grantee))
5725                .cloned(),
5726        ));
5727    }
5728
5729    // Replicas
5730    for replica in scx.catalog.get_cluster_replicas() {
5731        if role_ids.contains(&replica.owner_id()) {
5732            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5733        }
5734    }
5735
5736    // Clusters
5737    for cluster in scx.catalog.get_clusters() {
5738        if role_ids.contains(&cluster.owner_id()) {
5739            // Note: CASCADE is not required for replicas.
5740            if !cascade {
5741                let non_owned_bound_objects: Vec<_> = cluster
5742                    .bound_objects()
5743                    .into_iter()
5744                    .map(|item_id| scx.catalog.get_item(item_id))
5745                    .filter(|item| !role_ids.contains(&item.owner_id()))
5746                    .collect();
5747                if !non_owned_bound_objects.is_empty() {
5748                    let names: Vec<_> = non_owned_bound_objects
5749                        .into_iter()
5750                        .map(|item| {
5751                            (
5752                                item.item_type().to_string(),
5753                                scx.catalog.resolve_full_name(item.name()).to_string(),
5754                            )
5755                        })
5756                        .collect();
5757                    return Err(PlanError::DependentObjectsStillExist {
5758                        object_type: "cluster".to_string(),
5759                        object_name: cluster.name().to_string(),
5760                        dependents: names,
5761                    });
5762                }
5763            }
5764            drop_ids.push(cluster.id().into());
5765        }
5766        update_privilege_revokes(
5767            SystemObjectId::Object(cluster.id().into()),
5768            cluster.privileges(),
5769            &role_ids,
5770            &mut privilege_revokes,
5771        );
5772    }
5773
5774    // Items
5775    for item in scx.catalog.get_items() {
5776        if role_ids.contains(&item.owner_id()) {
5777            if !cascade {
5778                // Checks if any items still depend on this one, returning an error if so.
5779                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5780                    let non_owned_dependencies: Vec<_> = used_by
5781                        .into_iter()
5782                        .map(|item_id| scx.catalog.get_item(item_id))
5783                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5784                        .filter(|item| !role_ids.contains(&item.owner_id()))
5785                        .collect();
5786                    if !non_owned_dependencies.is_empty() {
5787                        let names: Vec<_> = non_owned_dependencies
5788                            .into_iter()
5789                            .map(|item| {
5790                                let item_typ = item.item_type().to_string();
5791                                let item_name =
5792                                    scx.catalog.resolve_full_name(item.name()).to_string();
5793                                (item_typ, item_name)
5794                            })
5795                            .collect();
5796                        Err(PlanError::DependentObjectsStillExist {
5797                            object_type: item.item_type().to_string(),
5798                            object_name: scx
5799                                .catalog
5800                                .resolve_full_name(item.name())
5801                                .to_string()
5802                                .to_string(),
5803                            dependents: names,
5804                        })
5805                    } else {
5806                        Ok(())
5807                    }
5808                };
5809
5810                // When this item gets dropped it will also drop its progress source, so we need to
5811                // check the users of those.
5812                if let Some(id) = item.progress_id() {
5813                    let progress_item = scx.catalog.get_item(&id);
5814                    check_if_dependents_exist(progress_item.used_by())?;
5815                }
5816                check_if_dependents_exist(item.used_by())?;
5817            }
5818            drop_ids.push(item.id().into());
5819        }
5820        update_privilege_revokes(
5821            SystemObjectId::Object(item.id().into()),
5822            item.privileges(),
5823            &role_ids,
5824            &mut privilege_revokes,
5825        );
5826    }
5827
5828    // Schemas
5829    for schema in scx.catalog.get_schemas() {
5830        if !schema.id().is_temporary() {
5831            if role_ids.contains(&schema.owner_id()) {
5832                if !cascade {
5833                    let non_owned_dependencies: Vec<_> = schema
5834                        .item_ids()
5835                        .map(|item_id| scx.catalog.get_item(&item_id))
5836                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5837                        .filter(|item| !role_ids.contains(&item.owner_id()))
5838                        .collect();
5839                    if !non_owned_dependencies.is_empty() {
5840                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5841                        sql_bail!(
5842                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5843                            full_schema_name.to_string().quoted()
5844                        );
5845                    }
5846                }
5847                drop_ids.push((*schema.database(), *schema.id()).into())
5848            }
5849            update_privilege_revokes(
5850                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5851                schema.privileges(),
5852                &role_ids,
5853                &mut privilege_revokes,
5854            );
5855        }
5856    }
5857
5858    // Databases
5859    for database in scx.catalog.get_databases() {
5860        if role_ids.contains(&database.owner_id()) {
5861            if !cascade {
5862                let non_owned_schemas: Vec<_> = database
5863                    .schemas()
5864                    .into_iter()
5865                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5866                    .collect();
5867                if !non_owned_schemas.is_empty() {
5868                    sql_bail!(
5869                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5870                        database.name().quoted(),
5871                    );
5872                }
5873            }
5874            drop_ids.push(database.id().into());
5875        }
5876        update_privilege_revokes(
5877            SystemObjectId::Object(database.id().into()),
5878            database.privileges(),
5879            &role_ids,
5880            &mut privilege_revokes,
5881        );
5882    }
5883
5884    // System
5885    update_privilege_revokes(
5886        SystemObjectId::System,
5887        scx.catalog.get_system_privileges(),
5888        &role_ids,
5889        &mut privilege_revokes,
5890    );
5891
5892    for (default_privilege_object, default_privilege_acl_items) in
5893        scx.catalog.get_default_privileges()
5894    {
5895        for default_privilege_acl_item in default_privilege_acl_items {
5896            if role_ids.contains(&default_privilege_object.role_id)
5897                || role_ids.contains(&default_privilege_acl_item.grantee)
5898            {
5899                default_privilege_revokes.push((
5900                    default_privilege_object.clone(),
5901                    default_privilege_acl_item.clone(),
5902                ));
5903            }
5904        }
5905    }
5906
5907    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5908
5909    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5910    if !system_ids.is_empty() {
5911        let mut owners = system_ids
5912            .into_iter()
5913            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5914            .collect::<BTreeSet<_>>()
5915            .into_iter()
5916            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5917        sql_bail!(
5918            "cannot drop objects owned by role {} because they are required by the database system",
5919            owners.join(", "),
5920        );
5921    }
5922
5923    Ok(Plan::DropOwned(DropOwnedPlan {
5924        role_ids: role_ids.into_iter().collect(),
5925        drop_ids,
5926        privilege_revokes,
5927        default_privilege_revokes,
5928    }))
5929}
5930
5931fn plan_retain_history_option(
5932    scx: &StatementContext,
5933    retain_history: Option<OptionalDuration>,
5934) -> Result<Option<CompactionWindow>, PlanError> {
5935    if let Some(OptionalDuration(lcw)) = retain_history {
5936        Ok(Some(plan_retain_history(scx, lcw)?))
5937    } else {
5938        Ok(None)
5939    }
5940}
5941
5942// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5943// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5944// already converts the zero duration into `None`. This function must not be called in the `RESET
5945// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5946// `None`.
5947fn plan_retain_history(
5948    scx: &StatementContext,
5949    lcw: Option<Duration>,
5950) -> Result<CompactionWindow, PlanError> {
5951    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5952    match lcw {
5953        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5954        // disable compaction), and should never occur here. Furthermore, some things actually do
5955        // break when this is set to real zero:
5956        // https://github.com/MaterializeInc/database-issues/issues/3798.
5957        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5958            option_name: "RETAIN HISTORY".to_string(),
5959            err: Box::new(PlanError::Unstructured(
5960                "internal error: unexpectedly zero".to_string(),
5961            )),
5962        }),
5963        Some(duration) => {
5964            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5965            // should only be possible during testing).
5966            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5967                && scx
5968                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5969                    .is_err()
5970            {
5971                return Err(PlanError::RetainHistoryLow {
5972                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5973                });
5974            }
5975            Ok(duration.try_into()?)
5976        }
5977        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5978        // to be a bad choice, so prevent it.
5979        None => {
5980            if scx
5981                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5982                .is_err()
5983            {
5984                Err(PlanError::RetainHistoryRequired)
5985            } else {
5986                Ok(CompactionWindow::DisableCompaction)
5987            }
5988        }
5989    }
5990}
5991
5992generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5993
5994fn plan_index_options(
5995    scx: &StatementContext,
5996    with_opts: Vec<IndexOption<Aug>>,
5997) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5998    if !with_opts.is_empty() {
5999        // Index options are not durable.
6000        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6001    }
6002
6003    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6004
6005    let mut out = Vec::with_capacity(1);
6006    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6007        out.push(crate::plan::IndexOption::RetainHistory(cw));
6008    }
6009    Ok(out)
6010}
6011
6012generate_extracted_config!(
6013    TableOption,
6014    (PartitionBy, Vec<Ident>),
6015    (RetainHistory, OptionalDuration),
6016    (RedactedTest, String)
6017);
6018
6019fn plan_table_options(
6020    scx: &StatementContext,
6021    desc: &RelationDesc,
6022    with_opts: Vec<TableOption<Aug>>,
6023) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6024    let TableOptionExtracted {
6025        partition_by,
6026        retain_history,
6027        redacted_test,
6028        ..
6029    }: TableOptionExtracted = with_opts.try_into()?;
6030
6031    if let Some(partition_by) = partition_by {
6032        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6033        check_partition_by(desc, partition_by)?;
6034    }
6035
6036    if redacted_test.is_some() {
6037        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6038    }
6039
6040    let mut out = Vec::with_capacity(1);
6041    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6042        out.push(crate::plan::TableOption::RetainHistory(cw));
6043    }
6044    Ok(out)
6045}
6046
6047pub fn plan_alter_index_options(
6048    scx: &mut StatementContext,
6049    AlterIndexStatement {
6050        index_name,
6051        if_exists,
6052        action,
6053    }: AlterIndexStatement<Aug>,
6054) -> Result<Plan, PlanError> {
6055    let object_type = ObjectType::Index;
6056    match action {
6057        AlterIndexAction::ResetOptions(options) => {
6058            let mut options = options.into_iter();
6059            if let Some(opt) = options.next() {
6060                match opt {
6061                    IndexOptionName::RetainHistory => {
6062                        if options.next().is_some() {
6063                            sql_bail!("RETAIN HISTORY must be only option");
6064                        }
6065                        return alter_retain_history(
6066                            scx,
6067                            object_type,
6068                            if_exists,
6069                            UnresolvedObjectName::Item(index_name),
6070                            None,
6071                        );
6072                    }
6073                }
6074            }
6075            sql_bail!("expected option");
6076        }
6077        AlterIndexAction::SetOptions(options) => {
6078            let mut options = options.into_iter();
6079            if let Some(opt) = options.next() {
6080                match opt.name {
6081                    IndexOptionName::RetainHistory => {
6082                        if options.next().is_some() {
6083                            sql_bail!("RETAIN HISTORY must be only option");
6084                        }
6085                        return alter_retain_history(
6086                            scx,
6087                            object_type,
6088                            if_exists,
6089                            UnresolvedObjectName::Item(index_name),
6090                            opt.value,
6091                        );
6092                    }
6093                }
6094            }
6095            sql_bail!("expected option");
6096        }
6097    }
6098}
6099
6100pub fn describe_alter_cluster_set_options(
6101    _: &StatementContext,
6102    _: AlterClusterStatement<Aug>,
6103) -> Result<StatementDesc, PlanError> {
6104    Ok(StatementDesc::new(None))
6105}
6106
6107pub fn plan_alter_cluster(
6108    scx: &mut StatementContext,
6109    AlterClusterStatement {
6110        name,
6111        action,
6112        if_exists,
6113    }: AlterClusterStatement<Aug>,
6114) -> Result<Plan, PlanError> {
6115    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6116        Some(entry) => entry,
6117        None => {
6118            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6119                name: name.to_ast_string_simple(),
6120                object_type: ObjectType::Cluster,
6121            });
6122
6123            return Ok(Plan::AlterNoop(AlterNoopPlan {
6124                object_type: ObjectType::Cluster,
6125            }));
6126        }
6127    };
6128
6129    let mut options: PlanClusterOption = Default::default();
6130    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6131
6132    match action {
6133        AlterClusterAction::SetOptions {
6134            options: set_options,
6135            with_options,
6136        } => {
6137            let ClusterOptionExtracted {
6138                availability_zones,
6139                introspection_debugging,
6140                introspection_interval,
6141                managed,
6142                replicas: replica_defs,
6143                replication_factor,
6144                seen: _,
6145                size,
6146                disk,
6147                schedule,
6148                workload_class,
6149            }: ClusterOptionExtracted = set_options.try_into()?;
6150
6151            if !scx.catalog.active_role_id().is_system() {
6152                if workload_class.is_some() {
6153                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6154                }
6155            }
6156
6157            match managed.unwrap_or_else(|| cluster.is_managed()) {
6158                true => {
6159                    let alter_strategy_extracted =
6160                        ClusterAlterOptionExtracted::try_from(with_options)?;
6161                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6162
6163                    match alter_strategy {
6164                        AlterClusterPlanStrategy::None => {}
6165                        _ => {
6166                            scx.require_feature_flag(
6167                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6168                            )?;
6169                        }
6170                    }
6171
6172                    if replica_defs.is_some() {
6173                        sql_bail!("REPLICAS not supported for managed clusters");
6174                    }
6175                    if schedule.is_some()
6176                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6177                    {
6178                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6179                    }
6180
6181                    if let Some(replication_factor) = replication_factor {
6182                        if schedule.is_some()
6183                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6184                        {
6185                            sql_bail!(
6186                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6187                            );
6188                        }
6189                        if let Some(current_schedule) = cluster.schedule() {
6190                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6191                                sql_bail!(
6192                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6193                                );
6194                            }
6195                        }
6196
6197                        let internal_replica_count =
6198                            cluster.replicas().iter().filter(|r| r.internal()).count();
6199                        let hypothetical_replica_count =
6200                            internal_replica_count + usize::cast_from(replication_factor);
6201
6202                        // Total number of replicas running is internal replicas
6203                        // + replication factor.
6204                        if contains_single_replica_objects(scx, cluster)
6205                            && hypothetical_replica_count > 1
6206                        {
6207                            return Err(PlanError::CreateReplicaFailStorageObjects {
6208                                current_replica_count: cluster.replica_ids().iter().count(),
6209                                internal_replica_count,
6210                                hypothetical_replica_count,
6211                            });
6212                        }
6213                    } else if alter_strategy.is_some() {
6214                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6215                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6216                        // just fail.
6217                        let internal_replica_count =
6218                            cluster.replicas().iter().filter(|r| r.internal()).count();
6219                        let hypothetical_replica_count = internal_replica_count * 2;
6220                        if contains_single_replica_objects(scx, cluster) {
6221                            return Err(PlanError::CreateReplicaFailStorageObjects {
6222                                current_replica_count: cluster.replica_ids().iter().count(),
6223                                internal_replica_count,
6224                                hypothetical_replica_count,
6225                            });
6226                        }
6227                    }
6228                }
6229                false => {
6230                    if !alter_strategy.is_none() {
6231                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6232                    }
6233                    if availability_zones.is_some() {
6234                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6235                    }
6236                    if replication_factor.is_some() {
6237                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6238                    }
6239                    if introspection_debugging.is_some() {
6240                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6241                    }
6242                    if introspection_interval.is_some() {
6243                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6244                    }
6245                    if size.is_some() {
6246                        sql_bail!("SIZE not supported for unmanaged clusters");
6247                    }
6248                    if disk.is_some() {
6249                        sql_bail!("DISK not supported for unmanaged clusters");
6250                    }
6251                    if schedule.is_some()
6252                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6253                    {
6254                        sql_bail!(
6255                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6256                        );
6257                    }
6258                    if let Some(current_schedule) = cluster.schedule() {
6259                        if !matches!(current_schedule, ClusterSchedule::Manual)
6260                            && schedule.is_none()
6261                        {
6262                            sql_bail!(
6263                                "when switching a cluster to unmanaged, if the managed \
6264                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6265                                explicitly set the SCHEDULE to MANUAL"
6266                            );
6267                        }
6268                    }
6269                }
6270            }
6271
6272            let mut replicas = vec![];
6273            for ReplicaDefinition { name, options } in
6274                replica_defs.into_iter().flat_map(Vec::into_iter)
6275            {
6276                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6277            }
6278
6279            if let Some(managed) = managed {
6280                options.managed = AlterOptionParameter::Set(managed);
6281            }
6282            if let Some(replication_factor) = replication_factor {
6283                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6284            }
6285            if let Some(size) = &size {
6286                options.size = AlterOptionParameter::Set(size.clone());
6287            }
6288            if let Some(availability_zones) = availability_zones {
6289                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6290            }
6291            if let Some(introspection_debugging) = introspection_debugging {
6292                options.introspection_debugging =
6293                    AlterOptionParameter::Set(introspection_debugging);
6294            }
6295            if let Some(introspection_interval) = introspection_interval {
6296                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6297            }
6298            if disk.is_some() {
6299                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6300                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6301                // we'll be able to remove the `DISK` option entirely.
6302                let size = size.as_deref().unwrap_or_else(|| {
6303                    cluster.managed_size().expect("cluster known to be managed")
6304                });
6305                if scx.catalog.is_cluster_size_cc(size) {
6306                    sql_bail!(
6307                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6308                    );
6309                }
6310
6311                scx.catalog
6312                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6313            }
6314            if !replicas.is_empty() {
6315                options.replicas = AlterOptionParameter::Set(replicas);
6316            }
6317            if let Some(schedule) = schedule {
6318                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6319            }
6320            if let Some(workload_class) = workload_class {
6321                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6322            }
6323        }
6324        AlterClusterAction::ResetOptions(reset_options) => {
6325            use AlterOptionParameter::Reset;
6326            use ClusterOptionName::*;
6327
6328            if !scx.catalog.active_role_id().is_system() {
6329                if reset_options.contains(&WorkloadClass) {
6330                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6331                }
6332            }
6333
6334            for option in reset_options {
6335                match option {
6336                    AvailabilityZones => options.availability_zones = Reset,
6337                    Disk => scx
6338                        .catalog
6339                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6340                    IntrospectionInterval => options.introspection_interval = Reset,
6341                    IntrospectionDebugging => options.introspection_debugging = Reset,
6342                    Managed => options.managed = Reset,
6343                    Replicas => options.replicas = Reset,
6344                    ReplicationFactor => options.replication_factor = Reset,
6345                    Size => options.size = Reset,
6346                    Schedule => options.schedule = Reset,
6347                    WorkloadClass => options.workload_class = Reset,
6348                }
6349            }
6350        }
6351    }
6352    Ok(Plan::AlterCluster(AlterClusterPlan {
6353        id: cluster.id(),
6354        name: cluster.name().to_string(),
6355        options,
6356        strategy: alter_strategy,
6357    }))
6358}
6359
6360pub fn describe_alter_set_cluster(
6361    _: &StatementContext,
6362    _: AlterSetClusterStatement<Aug>,
6363) -> Result<StatementDesc, PlanError> {
6364    Ok(StatementDesc::new(None))
6365}
6366
6367pub fn plan_alter_item_set_cluster(
6368    scx: &StatementContext,
6369    AlterSetClusterStatement {
6370        if_exists,
6371        set_cluster: in_cluster_name,
6372        name,
6373        object_type,
6374    }: AlterSetClusterStatement<Aug>,
6375) -> Result<Plan, PlanError> {
6376    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6377
6378    let object_type = object_type.into();
6379
6380    // Prevent access to `SET CLUSTER` for unsupported objects.
6381    match object_type {
6382        ObjectType::MaterializedView => {}
6383        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6384            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6385        }
6386        _ => {
6387            bail_never_supported!(
6388                format!("ALTER {object_type} SET CLUSTER"),
6389                "sql/alter-set-cluster/",
6390                format!("{object_type} has no associated cluster")
6391            )
6392        }
6393    }
6394
6395    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6396
6397    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6398        Some(entry) => {
6399            let current_cluster = entry.cluster_id();
6400            let Some(current_cluster) = current_cluster else {
6401                sql_bail!("No cluster associated with {name}");
6402            };
6403
6404            if current_cluster == in_cluster.id() {
6405                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6406            } else {
6407                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6408                    id: entry.id(),
6409                    set_cluster: in_cluster.id(),
6410                }))
6411            }
6412        }
6413        None => {
6414            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6415                name: name.to_ast_string_simple(),
6416                object_type,
6417            });
6418
6419            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6420        }
6421    }
6422}
6423
6424pub fn describe_alter_object_rename(
6425    _: &StatementContext,
6426    _: AlterObjectRenameStatement,
6427) -> Result<StatementDesc, PlanError> {
6428    Ok(StatementDesc::new(None))
6429}
6430
6431pub fn plan_alter_object_rename(
6432    scx: &mut StatementContext,
6433    AlterObjectRenameStatement {
6434        name,
6435        object_type,
6436        to_item_name,
6437        if_exists,
6438    }: AlterObjectRenameStatement,
6439) -> Result<Plan, PlanError> {
6440    let object_type = object_type.into();
6441    match (object_type, name) {
6442        (
6443            ObjectType::View
6444            | ObjectType::MaterializedView
6445            | ObjectType::Table
6446            | ObjectType::Source
6447            | ObjectType::Index
6448            | ObjectType::Sink
6449            | ObjectType::Secret
6450            | ObjectType::Connection,
6451            UnresolvedObjectName::Item(name),
6452        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6453        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6454            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6455        }
6456        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6457            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6458        }
6459        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6460            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6461        }
6462        (object_type, name) => {
6463            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6464        }
6465    }
6466}
6467
6468pub fn plan_alter_schema_rename(
6469    scx: &mut StatementContext,
6470    name: UnresolvedSchemaName,
6471    to_schema_name: Ident,
6472    if_exists: bool,
6473) -> Result<Plan, PlanError> {
6474    // Special case for mz_temp: with lazy temporary schema creation, the temp
6475    // schema may not exist yet, but we still need to return the correct error.
6476    // Check the schema name directly against MZ_TEMP_SCHEMA.
6477    let normalized = normalize::unresolved_schema_name(name.clone())?;
6478    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6479        sql_bail!(
6480            "cannot rename schemas in the ambient database: {:?}",
6481            mz_repr::namespaces::MZ_TEMP_SCHEMA
6482        );
6483    }
6484
6485    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6486        let object_type = ObjectType::Schema;
6487        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6488            name: name.to_ast_string_simple(),
6489            object_type,
6490        });
6491        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6492    };
6493
6494    // Make sure the name is unique.
6495    if scx
6496        .resolve_schema_in_database(&db_spec, &to_schema_name)
6497        .is_ok()
6498    {
6499        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6500            to_schema_name.clone().into_string(),
6501        )));
6502    }
6503
6504    // Prevent users from renaming system related schemas.
6505    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6506    if schema.id().is_system() {
6507        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6508    }
6509
6510    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6511        cur_schema_spec: (db_spec, schema_spec),
6512        new_schema_name: to_schema_name.into_string(),
6513    }))
6514}
6515
6516pub fn plan_alter_schema_swap<F>(
6517    scx: &mut StatementContext,
6518    name_a: UnresolvedSchemaName,
6519    name_b: Ident,
6520    gen_temp_suffix: F,
6521) -> Result<Plan, PlanError>
6522where
6523    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6524{
6525    // Special case for mz_temp: with lazy temporary schema creation, the temp
6526    // schema may not exist yet, but we still need to return the correct error.
6527    // Check the schema name directly against MZ_TEMP_SCHEMA.
6528    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6529    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6530    {
6531        sql_bail!("cannot swap schemas that are in the ambient database");
6532    }
6533    // Also check name_b (the target schema name)
6534    let name_b_str = normalize::ident_ref(&name_b);
6535    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6536        sql_bail!("cannot swap schemas that are in the ambient database");
6537    }
6538
6539    let schema_a = scx.resolve_schema(name_a.clone())?;
6540
6541    let db_spec = schema_a.database().clone();
6542    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6543        sql_bail!("cannot swap schemas that are in the ambient database");
6544    };
6545    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6546
6547    // We cannot swap system schemas.
6548    if schema_a.id().is_system() || schema_b.id().is_system() {
6549        bail_never_supported!("swapping a system schema".to_string())
6550    }
6551
6552    // Generate a temporary name we can swap schema_a to.
6553    //
6554    // 'check' returns if the temp schema name would be valid.
6555    let check = |temp_suffix: &str| {
6556        let mut temp_name = ident!("mz_schema_swap_");
6557        temp_name.append_lossy(temp_suffix);
6558        scx.resolve_schema_in_database(&db_spec, &temp_name)
6559            .is_err()
6560    };
6561    let temp_suffix = gen_temp_suffix(&check)?;
6562    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6563
6564    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6565        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6566        schema_a_name: schema_a.name().schema.to_string(),
6567        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6568        schema_b_name: schema_b.name().schema.to_string(),
6569        name_temp,
6570    }))
6571}
6572
6573pub fn plan_alter_item_rename(
6574    scx: &mut StatementContext,
6575    object_type: ObjectType,
6576    name: UnresolvedItemName,
6577    to_item_name: Ident,
6578    if_exists: bool,
6579) -> Result<Plan, PlanError> {
6580    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6581        Ok(r) => r,
6582        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6583        Err(PlanError::MismatchedObjectType {
6584            name,
6585            is_type: ObjectType::MaterializedView,
6586            expected_type: ObjectType::View,
6587        }) => {
6588            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6589        }
6590        e => e?,
6591    };
6592
6593    match resolved {
6594        Some(entry) => {
6595            let full_name = scx.catalog.resolve_full_name(entry.name());
6596            let item_type = entry.item_type();
6597
6598            let proposed_name = QualifiedItemName {
6599                qualifiers: entry.name().qualifiers.clone(),
6600                item: to_item_name.clone().into_string(),
6601            };
6602
6603            // For PostgreSQL compatibility, items and types cannot have
6604            // overlapping names in a variety of situations. See the comment on
6605            // `CatalogItemType::conflicts_with_type` for details.
6606            let conflicting_type_exists;
6607            let conflicting_item_exists;
6608            if item_type == CatalogItemType::Type {
6609                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6610                conflicting_item_exists = scx
6611                    .catalog
6612                    .get_item_by_name(&proposed_name)
6613                    .map(|item| item.item_type().conflicts_with_type())
6614                    .unwrap_or(false);
6615            } else {
6616                conflicting_type_exists = item_type.conflicts_with_type()
6617                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6618                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6619            };
6620            if conflicting_type_exists || conflicting_item_exists {
6621                sql_bail!("catalog item '{}' already exists", to_item_name);
6622            }
6623
6624            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6625                id: entry.id(),
6626                current_full_name: full_name,
6627                to_name: normalize::ident(to_item_name),
6628                object_type,
6629            }))
6630        }
6631        None => {
6632            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6633                name: name.to_ast_string_simple(),
6634                object_type,
6635            });
6636
6637            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6638        }
6639    }
6640}
6641
6642pub fn plan_alter_cluster_rename(
6643    scx: &mut StatementContext,
6644    object_type: ObjectType,
6645    name: Ident,
6646    to_name: Ident,
6647    if_exists: bool,
6648) -> Result<Plan, PlanError> {
6649    match resolve_cluster(scx, &name, if_exists)? {
6650        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6651            id: entry.id(),
6652            name: entry.name().to_string(),
6653            to_name: ident(to_name),
6654        })),
6655        None => {
6656            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6657                name: name.to_ast_string_simple(),
6658                object_type,
6659            });
6660
6661            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6662        }
6663    }
6664}
6665
6666pub fn plan_alter_cluster_swap<F>(
6667    scx: &mut StatementContext,
6668    name_a: Ident,
6669    name_b: Ident,
6670    gen_temp_suffix: F,
6671) -> Result<Plan, PlanError>
6672where
6673    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6674{
6675    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6676    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6677
6678    let check = |temp_suffix: &str| {
6679        let mut temp_name = ident!("mz_schema_swap_");
6680        temp_name.append_lossy(temp_suffix);
6681        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6682            // Temp name does not exist, so we can use it.
6683            Err(CatalogError::UnknownCluster(_)) => true,
6684            // Temp name already exists!
6685            Ok(_) | Err(_) => false,
6686        }
6687    };
6688    let temp_suffix = gen_temp_suffix(&check)?;
6689    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6690
6691    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6692        id_a: cluster_a.id(),
6693        id_b: cluster_b.id(),
6694        name_a: name_a.into_string(),
6695        name_b: name_b.into_string(),
6696        name_temp,
6697    }))
6698}
6699
6700pub fn plan_alter_cluster_replica_rename(
6701    scx: &mut StatementContext,
6702    object_type: ObjectType,
6703    name: QualifiedReplica,
6704    to_item_name: Ident,
6705    if_exists: bool,
6706) -> Result<Plan, PlanError> {
6707    match resolve_cluster_replica(scx, &name, if_exists)? {
6708        Some((cluster, replica)) => {
6709            ensure_cluster_is_not_managed(scx, cluster.id())?;
6710            Ok(Plan::AlterClusterReplicaRename(
6711                AlterClusterReplicaRenamePlan {
6712                    cluster_id: cluster.id(),
6713                    replica_id: replica,
6714                    name: QualifiedReplica {
6715                        cluster: Ident::new(cluster.name())?,
6716                        replica: name.replica,
6717                    },
6718                    to_name: normalize::ident(to_item_name),
6719                },
6720            ))
6721        }
6722        None => {
6723            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6724                name: name.to_ast_string_simple(),
6725                object_type,
6726            });
6727
6728            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6729        }
6730    }
6731}
6732
6733pub fn describe_alter_object_swap(
6734    _: &StatementContext,
6735    _: AlterObjectSwapStatement,
6736) -> Result<StatementDesc, PlanError> {
6737    Ok(StatementDesc::new(None))
6738}
6739
6740pub fn plan_alter_object_swap(
6741    scx: &mut StatementContext,
6742    stmt: AlterObjectSwapStatement,
6743) -> Result<Plan, PlanError> {
6744    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6745
6746    let AlterObjectSwapStatement {
6747        object_type,
6748        name_a,
6749        name_b,
6750    } = stmt;
6751    let object_type = object_type.into();
6752
6753    // We'll try 10 times to generate a temporary suffix.
6754    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6755        let mut attempts = 0;
6756        let name_temp = loop {
6757            attempts += 1;
6758            if attempts > 10 {
6759                tracing::warn!("Unable to generate temp id for swapping");
6760                sql_bail!("unable to swap!");
6761            }
6762
6763            // Call the provided closure to make sure this name is unique!
6764            let short_id = mz_ore::id_gen::temp_id();
6765            if check_fn(&short_id) {
6766                break short_id;
6767            }
6768        };
6769
6770        Ok(name_temp)
6771    };
6772
6773    match (object_type, name_a, name_b) {
6774        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6775            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6776        }
6777        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6778            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6779        }
6780        (object_type, _, _) => Err(PlanError::Unsupported {
6781            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6782            discussion_no: None,
6783        }),
6784    }
6785}
6786
6787pub fn describe_alter_retain_history(
6788    _: &StatementContext,
6789    _: AlterRetainHistoryStatement<Aug>,
6790) -> Result<StatementDesc, PlanError> {
6791    Ok(StatementDesc::new(None))
6792}
6793
6794pub fn plan_alter_retain_history(
6795    scx: &StatementContext,
6796    AlterRetainHistoryStatement {
6797        object_type,
6798        if_exists,
6799        name,
6800        history,
6801    }: AlterRetainHistoryStatement<Aug>,
6802) -> Result<Plan, PlanError> {
6803    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6804}
6805
6806fn alter_retain_history(
6807    scx: &StatementContext,
6808    object_type: ObjectType,
6809    if_exists: bool,
6810    name: UnresolvedObjectName,
6811    history: Option<WithOptionValue<Aug>>,
6812) -> Result<Plan, PlanError> {
6813    let name = match (object_type, name) {
6814        (
6815            // View gets a special error below.
6816            ObjectType::View
6817            | ObjectType::MaterializedView
6818            | ObjectType::Table
6819            | ObjectType::Source
6820            | ObjectType::Index,
6821            UnresolvedObjectName::Item(name),
6822        ) => name,
6823        (object_type, _) => {
6824            sql_bail!("{object_type} does not support RETAIN HISTORY")
6825        }
6826    };
6827    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6828        Some(entry) => {
6829            let full_name = scx.catalog.resolve_full_name(entry.name());
6830            let item_type = entry.item_type();
6831
6832            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6833            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6834                return Err(PlanError::AlterViewOnMaterializedView(
6835                    full_name.to_string(),
6836                ));
6837            } else if object_type == ObjectType::View {
6838                sql_bail!("{object_type} does not support RETAIN HISTORY")
6839            } else if object_type != item_type {
6840                sql_bail!(
6841                    "\"{}\" is a {} not a {}",
6842                    full_name,
6843                    entry.item_type(),
6844                    format!("{object_type}").to_lowercase()
6845                )
6846            }
6847
6848            // Save the original value so we can write it back down in the create_sql catalog item.
6849            let (value, lcw) = match &history {
6850                Some(WithOptionValue::RetainHistoryFor(value)) => {
6851                    let window = OptionalDuration::try_from_value(value.clone())?;
6852                    (Some(value.clone()), window.0)
6853                }
6854                // None is RESET, so use the default CW.
6855                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6856                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6857            };
6858            let window = plan_retain_history(scx, lcw)?;
6859
6860            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6861                id: entry.id(),
6862                value,
6863                window,
6864                object_type,
6865            }))
6866        }
6867        None => {
6868            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6869                name: name.to_ast_string_simple(),
6870                object_type,
6871            });
6872
6873            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6874        }
6875    }
6876}
6877
6878pub fn describe_alter_secret_options(
6879    _: &StatementContext,
6880    _: AlterSecretStatement<Aug>,
6881) -> Result<StatementDesc, PlanError> {
6882    Ok(StatementDesc::new(None))
6883}
6884
6885pub fn plan_alter_secret(
6886    scx: &mut StatementContext,
6887    stmt: AlterSecretStatement<Aug>,
6888) -> Result<Plan, PlanError> {
6889    let AlterSecretStatement {
6890        name,
6891        if_exists,
6892        value,
6893    } = stmt;
6894    let object_type = ObjectType::Secret;
6895    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6896        Some(entry) => entry.id(),
6897        None => {
6898            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6899                name: name.to_string(),
6900                object_type,
6901            });
6902
6903            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6904        }
6905    };
6906
6907    let secret_as = query::plan_secret_as(scx, value)?;
6908
6909    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6910}
6911
6912pub fn describe_alter_connection(
6913    _: &StatementContext,
6914    _: AlterConnectionStatement<Aug>,
6915) -> Result<StatementDesc, PlanError> {
6916    Ok(StatementDesc::new(None))
6917}
6918
6919generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6920
6921pub fn plan_alter_connection(
6922    scx: &StatementContext,
6923    stmt: AlterConnectionStatement<Aug>,
6924) -> Result<Plan, PlanError> {
6925    let AlterConnectionStatement {
6926        name,
6927        if_exists,
6928        actions,
6929        with_options,
6930    } = stmt;
6931    let conn_name = normalize::unresolved_item_name(name)?;
6932    let entry = match scx.catalog.resolve_item(&conn_name) {
6933        Ok(entry) => entry,
6934        Err(_) if if_exists => {
6935            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6936                name: conn_name.to_string(),
6937                object_type: ObjectType::Sink,
6938            });
6939
6940            return Ok(Plan::AlterNoop(AlterNoopPlan {
6941                object_type: ObjectType::Connection,
6942            }));
6943        }
6944        Err(e) => return Err(e.into()),
6945    };
6946
6947    let connection = entry.connection()?;
6948
6949    if actions
6950        .iter()
6951        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6952    {
6953        if actions.len() > 1 {
6954            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6955        }
6956
6957        if !with_options.is_empty() {
6958            sql_bail!(
6959                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6960                with_options
6961                    .iter()
6962                    .map(|o| o.to_ast_string_simple())
6963                    .join(", ")
6964            );
6965        }
6966
6967        if !matches!(connection, Connection::Ssh(_)) {
6968            sql_bail!(
6969                "{} is not an SSH connection",
6970                scx.catalog.resolve_full_name(entry.name())
6971            )
6972        }
6973
6974        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6975            id: entry.id(),
6976            action: crate::plan::AlterConnectionAction::RotateKeys,
6977        }));
6978    }
6979
6980    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6981    if options.validate.is_some() {
6982        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6983    }
6984
6985    let validate = match options.validate {
6986        Some(val) => val,
6987        None => {
6988            scx.catalog
6989                .system_vars()
6990                .enable_default_connection_validation()
6991                && connection.validate_by_default()
6992        }
6993    };
6994
6995    let connection_type = match connection {
6996        Connection::Aws(_) => CreateConnectionType::Aws,
6997        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6998        Connection::Kafka(_) => CreateConnectionType::Kafka,
6999        Connection::Csr(_) => CreateConnectionType::Csr,
7000        Connection::Postgres(_) => CreateConnectionType::Postgres,
7001        Connection::Ssh(_) => CreateConnectionType::Ssh,
7002        Connection::MySql(_) => CreateConnectionType::MySql,
7003        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7004        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7005    };
7006
7007    // Collect all options irrespective of action taken on them.
7008    let specified_options: BTreeSet<_> = actions
7009        .iter()
7010        .map(|action: &AlterConnectionAction<Aug>| match action {
7011            AlterConnectionAction::SetOption(option) => option.name.clone(),
7012            AlterConnectionAction::DropOption(name) => name.clone(),
7013            AlterConnectionAction::RotateKeys => unreachable!(),
7014        })
7015        .collect();
7016
7017    for invalid in INALTERABLE_OPTIONS {
7018        if specified_options.contains(invalid) {
7019            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7020        }
7021    }
7022
7023    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7024
7025    // Partition operations into set and drop
7026    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7027        actions.into_iter().partition_map(|action| match action {
7028            AlterConnectionAction::SetOption(option) => Either::Left(option),
7029            AlterConnectionAction::DropOption(name) => Either::Right(name),
7030            AlterConnectionAction::RotateKeys => unreachable!(),
7031        });
7032
7033    let set_options: BTreeMap<_, _> = set_options_vec
7034        .clone()
7035        .into_iter()
7036        .map(|option| (option.name, option.value))
7037        .collect();
7038
7039    // Type check values + avoid duplicates; we don't want to e.g. let users
7040    // drop and set the same option in the same statement, so treating drops as
7041    // sets here is fine.
7042    let connection_options_extracted =
7043        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7044
7045    let duplicates: Vec<_> = connection_options_extracted
7046        .seen
7047        .intersection(&drop_options)
7048        .collect();
7049
7050    if !duplicates.is_empty() {
7051        sql_bail!(
7052            "cannot both SET and DROP/RESET options {}",
7053            duplicates
7054                .iter()
7055                .map(|option| option.to_string())
7056                .join(", ")
7057        )
7058    }
7059
7060    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7061        let set_options_count = mutually_exclusive_options
7062            .iter()
7063            .filter(|o| set_options.contains_key(o))
7064            .count();
7065        let drop_options_count = mutually_exclusive_options
7066            .iter()
7067            .filter(|o| drop_options.contains(o))
7068            .count();
7069
7070        // Disallow setting _and_ resetting mutually exclusive options
7071        if set_options_count > 0 && drop_options_count > 0 {
7072            sql_bail!(
7073                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7074                connection_type,
7075                mutually_exclusive_options
7076                    .iter()
7077                    .map(|option| option.to_string())
7078                    .join(", ")
7079            )
7080        }
7081
7082        // If any option is either set or dropped, ensure all mutually exclusive
7083        // options are dropped. We do this "behind the scenes", even though we
7084        // disallow users from performing the same action because this is the
7085        // mechanism by which we overwrite values elsewhere in the code.
7086        if set_options_count > 0 || drop_options_count > 0 {
7087            drop_options.extend(mutually_exclusive_options.iter().cloned());
7088        }
7089
7090        // n.b. if mutually exclusive options are set, those will error when we
7091        // try to replan the connection.
7092    }
7093
7094    Ok(Plan::AlterConnection(AlterConnectionPlan {
7095        id: entry.id(),
7096        action: crate::plan::AlterConnectionAction::AlterOptions {
7097            set_options,
7098            drop_options,
7099            validate,
7100        },
7101    }))
7102}
7103
7104pub fn describe_alter_sink(
7105    _: &StatementContext,
7106    _: AlterSinkStatement<Aug>,
7107) -> Result<StatementDesc, PlanError> {
7108    Ok(StatementDesc::new(None))
7109}
7110
7111pub fn plan_alter_sink(
7112    scx: &mut StatementContext,
7113    stmt: AlterSinkStatement<Aug>,
7114) -> Result<Plan, PlanError> {
7115    let AlterSinkStatement {
7116        sink_name,
7117        if_exists,
7118        action,
7119    } = stmt;
7120
7121    let object_type = ObjectType::Sink;
7122    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7123
7124    let Some(item) = item else {
7125        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7126            name: sink_name.to_string(),
7127            object_type,
7128        });
7129
7130        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7131    };
7132    // Always ALTER objects from their latest version.
7133    let item = item.at_version(RelationVersionSelector::Latest);
7134
7135    match action {
7136        AlterSinkAction::ChangeRelation(new_from) => {
7137            // First we reconstruct the original CREATE SINK statement
7138            let create_sql = item.create_sql();
7139            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7140            let [stmt]: [StatementParseResult; 1] = stmts
7141                .try_into()
7142                .expect("create sql of sink was not exactly one statement");
7143            let Statement::CreateSink(stmt) = stmt.ast else {
7144                unreachable!("invalid create SQL for sink item");
7145            };
7146
7147            // Then resolve and swap the resolved from relation to the new one
7148            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7149            stmt.from = new_from;
7150
7151            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7152            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7153                unreachable!("invalid plan for CREATE SINK statement");
7154            };
7155
7156            plan.sink.version += 1;
7157
7158            Ok(Plan::AlterSink(AlterSinkPlan {
7159                item_id: item.id(),
7160                global_id: item.global_id(),
7161                sink: plan.sink,
7162                with_snapshot: plan.with_snapshot,
7163                in_cluster: plan.in_cluster,
7164            }))
7165        }
7166        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7167        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7168    }
7169}
7170
7171pub fn describe_alter_source(
7172    _: &StatementContext,
7173    _: AlterSourceStatement<Aug>,
7174) -> Result<StatementDesc, PlanError> {
7175    // TODO: put the options here, right?
7176    Ok(StatementDesc::new(None))
7177}
7178
7179generate_extracted_config!(
7180    AlterSourceAddSubsourceOption,
7181    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7182    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7183    (Details, String)
7184);
7185
7186pub fn plan_alter_source(
7187    scx: &mut StatementContext,
7188    stmt: AlterSourceStatement<Aug>,
7189) -> Result<Plan, PlanError> {
7190    let AlterSourceStatement {
7191        source_name,
7192        if_exists,
7193        action,
7194    } = stmt;
7195    let object_type = ObjectType::Source;
7196
7197    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7198        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7199            name: source_name.to_string(),
7200            object_type,
7201        });
7202
7203        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7204    }
7205
7206    match action {
7207        AlterSourceAction::SetOptions(options) => {
7208            let mut options = options.into_iter();
7209            let option = options.next().unwrap();
7210            if option.name == CreateSourceOptionName::RetainHistory {
7211                if options.next().is_some() {
7212                    sql_bail!("RETAIN HISTORY must be only option");
7213                }
7214                return alter_retain_history(
7215                    scx,
7216                    object_type,
7217                    if_exists,
7218                    UnresolvedObjectName::Item(source_name),
7219                    option.value,
7220                );
7221            }
7222            // n.b we use this statement in purification in a way that cannot be
7223            // planned directly.
7224            sql_bail!(
7225                "Cannot modify the {} of a SOURCE.",
7226                option.name.to_ast_string_simple()
7227            );
7228        }
7229        AlterSourceAction::ResetOptions(reset) => {
7230            let mut options = reset.into_iter();
7231            let option = options.next().unwrap();
7232            if option == CreateSourceOptionName::RetainHistory {
7233                if options.next().is_some() {
7234                    sql_bail!("RETAIN HISTORY must be only option");
7235                }
7236                return alter_retain_history(
7237                    scx,
7238                    object_type,
7239                    if_exists,
7240                    UnresolvedObjectName::Item(source_name),
7241                    None,
7242                );
7243            }
7244            sql_bail!(
7245                "Cannot modify the {} of a SOURCE.",
7246                option.to_ast_string_simple()
7247            );
7248        }
7249        AlterSourceAction::DropSubsources { .. } => {
7250            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7251        }
7252        AlterSourceAction::AddSubsources { .. } => {
7253            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7254        }
7255        AlterSourceAction::RefreshReferences => {
7256            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7257        }
7258    };
7259}
7260
7261pub fn describe_alter_system_set(
7262    _: &StatementContext,
7263    _: AlterSystemSetStatement,
7264) -> Result<StatementDesc, PlanError> {
7265    Ok(StatementDesc::new(None))
7266}
7267
7268pub fn plan_alter_system_set(
7269    _: &StatementContext,
7270    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7271) -> Result<Plan, PlanError> {
7272    let name = name.to_string();
7273    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7274        name,
7275        value: scl::plan_set_variable_to(to)?,
7276    }))
7277}
7278
7279pub fn describe_alter_system_reset(
7280    _: &StatementContext,
7281    _: AlterSystemResetStatement,
7282) -> Result<StatementDesc, PlanError> {
7283    Ok(StatementDesc::new(None))
7284}
7285
7286pub fn plan_alter_system_reset(
7287    _: &StatementContext,
7288    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7289) -> Result<Plan, PlanError> {
7290    let name = name.to_string();
7291    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7292}
7293
7294pub fn describe_alter_system_reset_all(
7295    _: &StatementContext,
7296    _: AlterSystemResetAllStatement,
7297) -> Result<StatementDesc, PlanError> {
7298    Ok(StatementDesc::new(None))
7299}
7300
7301pub fn plan_alter_system_reset_all(
7302    _: &StatementContext,
7303    _: AlterSystemResetAllStatement,
7304) -> Result<Plan, PlanError> {
7305    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7306}
7307
7308pub fn describe_alter_role(
7309    _: &StatementContext,
7310    _: AlterRoleStatement<Aug>,
7311) -> Result<StatementDesc, PlanError> {
7312    Ok(StatementDesc::new(None))
7313}
7314
7315pub fn plan_alter_role(
7316    scx: &StatementContext,
7317    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7318) -> Result<Plan, PlanError> {
7319    let option = match option {
7320        AlterRoleOption::Attributes(attrs) => {
7321            let attrs = plan_role_attributes(attrs, scx)?;
7322            PlannedAlterRoleOption::Attributes(attrs)
7323        }
7324        AlterRoleOption::Variable(variable) => {
7325            let var = plan_role_variable(variable)?;
7326            PlannedAlterRoleOption::Variable(var)
7327        }
7328    };
7329
7330    Ok(Plan::AlterRole(AlterRolePlan {
7331        id: name.id,
7332        name: name.name,
7333        option,
7334    }))
7335}
7336
7337pub fn describe_alter_table_add_column(
7338    _: &StatementContext,
7339    _: AlterTableAddColumnStatement<Aug>,
7340) -> Result<StatementDesc, PlanError> {
7341    Ok(StatementDesc::new(None))
7342}
7343
7344pub fn plan_alter_table_add_column(
7345    scx: &StatementContext,
7346    stmt: AlterTableAddColumnStatement<Aug>,
7347) -> Result<Plan, PlanError> {
7348    let AlterTableAddColumnStatement {
7349        if_exists,
7350        name,
7351        if_col_not_exist,
7352        column_name,
7353        data_type,
7354    } = stmt;
7355    let object_type = ObjectType::Table;
7356
7357    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7358
7359    let (relation_id, item_name, desc) =
7360        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7361            Some(item) => {
7362                // Always add columns to the latest version of the item.
7363                let item_name = scx.catalog.resolve_full_name(item.name());
7364                let item = item.at_version(RelationVersionSelector::Latest);
7365                let desc = item.relation_desc().expect("table has desc").into_owned();
7366                (item.id(), item_name, desc)
7367            }
7368            None => {
7369                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7370                    name: name.to_ast_string_simple(),
7371                    object_type,
7372                });
7373                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7374            }
7375        };
7376
7377    let column_name = ColumnName::from(column_name.as_str());
7378    if desc.get_by_name(&column_name).is_some() {
7379        if if_col_not_exist {
7380            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7381                column_name: column_name.to_string(),
7382                object_name: item_name.item,
7383            });
7384            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7385        } else {
7386            return Err(PlanError::ColumnAlreadyExists {
7387                column_name,
7388                object_name: item_name.item,
7389            });
7390        }
7391    }
7392
7393    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7394    // TODO(alter_table): Support non-nullable columns with default values.
7395    let column_type = scalar_type.nullable(true);
7396    // "unresolve" our data type so we can later update the persisted create_sql.
7397    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7398
7399    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7400        relation_id,
7401        column_name,
7402        column_type,
7403        raw_sql_type,
7404    }))
7405}
7406
7407pub fn describe_alter_materialized_view_apply_replacement(
7408    _: &StatementContext,
7409    _: AlterMaterializedViewApplyReplacementStatement,
7410) -> Result<StatementDesc, PlanError> {
7411    Ok(StatementDesc::new(None))
7412}
7413
7414pub fn plan_alter_materialized_view_apply_replacement(
7415    scx: &StatementContext,
7416    stmt: AlterMaterializedViewApplyReplacementStatement,
7417) -> Result<Plan, PlanError> {
7418    let AlterMaterializedViewApplyReplacementStatement {
7419        if_exists,
7420        name,
7421        replacement_name,
7422    } = stmt;
7423
7424    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7425
7426    let object_type = ObjectType::MaterializedView;
7427    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7428        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7429            name: name.to_ast_string_simple(),
7430            object_type,
7431        });
7432        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7433    };
7434
7435    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7436        .expect("if_exists not set");
7437
7438    if replacement.replacement_target() != Some(mv.id()) {
7439        return Err(PlanError::InvalidReplacement {
7440            item_type: mv.item_type(),
7441            item_name: scx.catalog.minimal_qualification(mv.name()),
7442            replacement_type: replacement.item_type(),
7443            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7444        });
7445    }
7446
7447    Ok(Plan::AlterMaterializedViewApplyReplacement(
7448        AlterMaterializedViewApplyReplacementPlan {
7449            id: mv.id(),
7450            replacement_id: replacement.id(),
7451        },
7452    ))
7453}
7454
7455pub fn describe_comment(
7456    _: &StatementContext,
7457    _: CommentStatement<Aug>,
7458) -> Result<StatementDesc, PlanError> {
7459    Ok(StatementDesc::new(None))
7460}
7461
7462pub fn plan_comment(
7463    scx: &mut StatementContext,
7464    stmt: CommentStatement<Aug>,
7465) -> Result<Plan, PlanError> {
7466    const MAX_COMMENT_LENGTH: usize = 1024;
7467
7468    let CommentStatement { object, comment } = stmt;
7469
7470    // TODO(parkmycar): Make max comment length configurable.
7471    if let Some(c) = &comment {
7472        if c.len() > 1024 {
7473            return Err(PlanError::CommentTooLong {
7474                length: c.len(),
7475                max_size: MAX_COMMENT_LENGTH,
7476            });
7477        }
7478    }
7479
7480    let (object_id, column_pos) = match &object {
7481        com_ty @ CommentObjectType::Table { name }
7482        | com_ty @ CommentObjectType::View { name }
7483        | com_ty @ CommentObjectType::MaterializedView { name }
7484        | com_ty @ CommentObjectType::Index { name }
7485        | com_ty @ CommentObjectType::Func { name }
7486        | com_ty @ CommentObjectType::Connection { name }
7487        | com_ty @ CommentObjectType::Source { name }
7488        | com_ty @ CommentObjectType::Sink { name }
7489        | com_ty @ CommentObjectType::Secret { name }
7490        | com_ty @ CommentObjectType::ContinualTask { name } => {
7491            let item = scx.get_item_by_resolved_name(name)?;
7492            match (com_ty, item.item_type()) {
7493                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7494                    (CommentObjectId::Table(item.id()), None)
7495                }
7496                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7497                    (CommentObjectId::View(item.id()), None)
7498                }
7499                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7500                    (CommentObjectId::MaterializedView(item.id()), None)
7501                }
7502                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7503                    (CommentObjectId::Index(item.id()), None)
7504                }
7505                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7506                    (CommentObjectId::Func(item.id()), None)
7507                }
7508                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7509                    (CommentObjectId::Connection(item.id()), None)
7510                }
7511                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7512                    (CommentObjectId::Source(item.id()), None)
7513                }
7514                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7515                    (CommentObjectId::Sink(item.id()), None)
7516                }
7517                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7518                    (CommentObjectId::Secret(item.id()), None)
7519                }
7520                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7521                    (CommentObjectId::ContinualTask(item.id()), None)
7522                }
7523                (com_ty, cat_ty) => {
7524                    let expected_type = match com_ty {
7525                        CommentObjectType::Table { .. } => ObjectType::Table,
7526                        CommentObjectType::View { .. } => ObjectType::View,
7527                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7528                        CommentObjectType::Index { .. } => ObjectType::Index,
7529                        CommentObjectType::Func { .. } => ObjectType::Func,
7530                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7531                        CommentObjectType::Source { .. } => ObjectType::Source,
7532                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7533                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7534                        _ => unreachable!("these are the only types we match on"),
7535                    };
7536
7537                    return Err(PlanError::InvalidObjectType {
7538                        expected_type: SystemObjectType::Object(expected_type),
7539                        actual_type: SystemObjectType::Object(cat_ty.into()),
7540                        object_name: item.name().item.clone(),
7541                    });
7542                }
7543            }
7544        }
7545        CommentObjectType::Type { ty } => match ty {
7546            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7547                sql_bail!("cannot comment on anonymous list or map type");
7548            }
7549            ResolvedDataType::Named { id, modifiers, .. } => {
7550                if !modifiers.is_empty() {
7551                    sql_bail!("cannot comment on type with modifiers");
7552                }
7553                (CommentObjectId::Type(*id), None)
7554            }
7555            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7556        },
7557        CommentObjectType::Column { name } => {
7558            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7559            match item.item_type() {
7560                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7561                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7562                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7563                CatalogItemType::MaterializedView => {
7564                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7565                }
7566                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7567                r => {
7568                    return Err(PlanError::Unsupported {
7569                        feature: format!("Specifying comments on a column of {r}"),
7570                        discussion_no: None,
7571                    });
7572                }
7573            }
7574        }
7575        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7576        CommentObjectType::Database { name } => {
7577            (CommentObjectId::Database(*name.database_id()), None)
7578        }
7579        CommentObjectType::Schema { name } => {
7580            // Temporary schemas cannot have comments - they are connection-specific
7581            // and transient. With lazy temporary schema creation, the temp schema
7582            // may not exist yet, but we still need to return the correct error.
7583            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7584                sql_bail!(
7585                    "cannot comment on schema {} because it is a temporary schema",
7586                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7587                );
7588            }
7589            (
7590                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7591                None,
7592            )
7593        }
7594        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7595        CommentObjectType::ClusterReplica { name } => {
7596            let replica = scx.catalog.resolve_cluster_replica(name)?;
7597            (
7598                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7599                None,
7600            )
7601        }
7602        CommentObjectType::NetworkPolicy { name } => {
7603            (CommentObjectId::NetworkPolicy(name.id), None)
7604        }
7605    };
7606
7607    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7608    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7609    // it's the easiest place to raise an error.
7610    //
7611    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7612    if let Some(p) = column_pos {
7613        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7614            max_num_columns: MAX_NUM_COLUMNS,
7615            req_num_columns: p,
7616        })?;
7617    }
7618
7619    Ok(Plan::Comment(CommentPlan {
7620        object_id,
7621        sub_component: column_pos,
7622        comment,
7623    }))
7624}
7625
7626pub(crate) fn resolve_cluster<'a>(
7627    scx: &'a StatementContext,
7628    name: &'a Ident,
7629    if_exists: bool,
7630) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7631    match scx.resolve_cluster(Some(name)) {
7632        Ok(cluster) => Ok(Some(cluster)),
7633        Err(_) if if_exists => Ok(None),
7634        Err(e) => Err(e),
7635    }
7636}
7637
7638pub(crate) fn resolve_cluster_replica<'a>(
7639    scx: &'a StatementContext,
7640    name: &QualifiedReplica,
7641    if_exists: bool,
7642) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7643    match scx.resolve_cluster(Some(&name.cluster)) {
7644        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7645            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7646            None if if_exists => Ok(None),
7647            None => Err(sql_err!(
7648                "CLUSTER {} has no CLUSTER REPLICA named {}",
7649                cluster.name(),
7650                name.replica.as_str().quoted(),
7651            )),
7652        },
7653        Err(_) if if_exists => Ok(None),
7654        Err(e) => Err(e),
7655    }
7656}
7657
7658pub(crate) fn resolve_database<'a>(
7659    scx: &'a StatementContext,
7660    name: &'a UnresolvedDatabaseName,
7661    if_exists: bool,
7662) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7663    match scx.resolve_database(name) {
7664        Ok(database) => Ok(Some(database)),
7665        Err(_) if if_exists => Ok(None),
7666        Err(e) => Err(e),
7667    }
7668}
7669
7670pub(crate) fn resolve_schema<'a>(
7671    scx: &'a StatementContext,
7672    name: UnresolvedSchemaName,
7673    if_exists: bool,
7674) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7675    match scx.resolve_schema(name) {
7676        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7677        Err(_) if if_exists => Ok(None),
7678        Err(e) => Err(e),
7679    }
7680}
7681
7682pub(crate) fn resolve_network_policy<'a>(
7683    scx: &'a StatementContext,
7684    name: Ident,
7685    if_exists: bool,
7686) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7687    match scx.catalog.resolve_network_policy(&name.to_string()) {
7688        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7689            id: policy.id(),
7690            name: policy.name().to_string(),
7691        })),
7692        Err(_) if if_exists => Ok(None),
7693        Err(e) => Err(e.into()),
7694    }
7695}
7696
7697pub(crate) fn resolve_item_or_type<'a>(
7698    scx: &'a StatementContext,
7699    object_type: ObjectType,
7700    name: UnresolvedItemName,
7701    if_exists: bool,
7702) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7703    let name = normalize::unresolved_item_name(name)?;
7704    let catalog_item = match object_type {
7705        ObjectType::Type => scx.catalog.resolve_type(&name),
7706        _ => scx.catalog.resolve_item(&name),
7707    };
7708
7709    match catalog_item {
7710        Ok(item) => {
7711            let is_type = ObjectType::from(item.item_type());
7712            if object_type == is_type {
7713                Ok(Some(item))
7714            } else {
7715                Err(PlanError::MismatchedObjectType {
7716                    name: scx.catalog.minimal_qualification(item.name()),
7717                    is_type,
7718                    expected_type: object_type,
7719                })
7720            }
7721        }
7722        Err(_) if if_exists => Ok(None),
7723        Err(e) => Err(e.into()),
7724    }
7725}
7726
7727/// Returns an error if the given cluster is a managed cluster
7728fn ensure_cluster_is_not_managed(
7729    scx: &StatementContext,
7730    cluster_id: ClusterId,
7731) -> Result<(), PlanError> {
7732    let cluster = scx.catalog.get_cluster(cluster_id);
7733    if cluster.is_managed() {
7734        Err(PlanError::ManagedCluster {
7735            cluster_name: cluster.name().to_string(),
7736        })
7737    } else {
7738        Ok(())
7739    }
7740}