mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_ore::vec::VecExt;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::adt::interval::Interval;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::network_policy_id::NetworkPolicyId;
38use mz_repr::optimize::OptimizerFeatureOverrides;
39use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, ColumnName, ColumnType, RelationDesc, RelationType, RelationVersion,
43    RelationVersionSelector, ScalarType, Timestamp, VersionedRelationDesc, preserves_order,
44    strconv,
45};
46use mz_sql_parser::ast::{
47    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
48    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
49    AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
50    AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
51    AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
52    AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
53    AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
54    AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
55    ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
56    ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
57    ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
58    ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
59    ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
60    CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
61    CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
62    CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
63    CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
64    CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
65    CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
66    CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
67    CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
68    CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
69    CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
70    CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
71    CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
72    DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
91    StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::SqlServerSourceExportDetails;
115use mz_storage_types::sources::{
116    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
117    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
118    SourceExportDetails, SourceExportStatementDetails, SqlServerSource, SqlServerSourceExtras,
119    Timeline,
120};
121use prost::Message;
122
123use crate::ast::display::AstDisplay;
124use crate::catalog::{
125    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
126    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
127};
128use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
129use crate::names::{
130    Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
131    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
132    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
133};
134use crate::normalize::{self, ident};
135use crate::plan::error::PlanError;
136use crate::plan::query::{
137    CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
138    scalar_type_from_sql,
139};
140use crate::plan::scope::Scope;
141use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
142use crate::plan::statement::{StatementContext, StatementDesc, scl};
143use crate::plan::typeconv::CastContext;
144use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
145use crate::plan::{
146    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
147    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
148    AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
149    AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
150    AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
151    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
152    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
153    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
154    CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
155    CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
156    CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
157    CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, Index, Ingestion,
158    MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection,
159    Params, Plan, PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig,
160    Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
161    WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
162    transform_ast,
163};
164use crate::session::vars::{
165    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
166    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
167};
168use crate::{names, parse};
169
170mod connection;
171
172// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
173//
174// The real max is probably higher than this, but it's easier to relax a constraint than make it
175// more strict.
176const MAX_NUM_COLUMNS: usize = 256;
177
178static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
179    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
180
181/// Given a relation desc and a column list, checks that:
182/// - the column list is a prefix of the desc;
183/// - all the listed columns are types that have meaningful Persist-level ordering.
184fn check_partition_by(desc: &RelationDesc, partition_by: Vec<Ident>) -> Result<(), PlanError> {
185    for (idx, ((desc_name, desc_type), partition_name)) in
186        desc.iter().zip(partition_by.into_iter()).enumerate()
187    {
188        let partition_name = normalize::column_name(partition_name);
189        if *desc_name != partition_name {
190            sql_bail!(
191                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
192            );
193        }
194        if !preserves_order(&desc_type.scalar_type) {
195            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
196        }
197    }
198    Ok(())
199}
200
201pub fn describe_create_database(
202    _: &StatementContext,
203    _: CreateDatabaseStatement,
204) -> Result<StatementDesc, PlanError> {
205    Ok(StatementDesc::new(None))
206}
207
208pub fn plan_create_database(
209    _: &StatementContext,
210    CreateDatabaseStatement {
211        name,
212        if_not_exists,
213    }: CreateDatabaseStatement,
214) -> Result<Plan, PlanError> {
215    Ok(Plan::CreateDatabase(CreateDatabasePlan {
216        name: normalize::ident(name.0),
217        if_not_exists,
218    }))
219}
220
221pub fn describe_create_schema(
222    _: &StatementContext,
223    _: CreateSchemaStatement,
224) -> Result<StatementDesc, PlanError> {
225    Ok(StatementDesc::new(None))
226}
227
228pub fn plan_create_schema(
229    scx: &StatementContext,
230    CreateSchemaStatement {
231        mut name,
232        if_not_exists,
233    }: CreateSchemaStatement,
234) -> Result<Plan, PlanError> {
235    if name.0.len() > 2 {
236        sql_bail!("schema name {} has more than two components", name);
237    }
238    let schema_name = normalize::ident(
239        name.0
240            .pop()
241            .expect("names always have at least one component"),
242    );
243    let database_spec = match name.0.pop() {
244        None => match scx.catalog.active_database() {
245            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
246            None => sql_bail!("no database specified and no active database"),
247        },
248        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
249            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
250            Err(_) => sql_bail!("invalid database {}", n.as_str()),
251        },
252    };
253    Ok(Plan::CreateSchema(CreateSchemaPlan {
254        database_spec,
255        schema_name,
256        if_not_exists,
257    }))
258}
259
260pub fn describe_create_table(
261    _: &StatementContext,
262    _: CreateTableStatement<Aug>,
263) -> Result<StatementDesc, PlanError> {
264    Ok(StatementDesc::new(None))
265}
266
267pub fn plan_create_table(
268    scx: &StatementContext,
269    stmt: CreateTableStatement<Aug>,
270) -> Result<Plan, PlanError> {
271    let CreateTableStatement {
272        name,
273        columns,
274        constraints,
275        if_not_exists,
276        temporary,
277        with_options,
278    } = &stmt;
279
280    let names: Vec<_> = columns
281        .iter()
282        .filter(|c| {
283            // This set of `names` is used to create the initial RelationDesc.
284            // Columns that have been added at later versions of the table will
285            // get added further below.
286            let is_versioned = c
287                .options
288                .iter()
289                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
290            !is_versioned
291        })
292        .map(|c| normalize::column_name(c.name.clone()))
293        .collect();
294
295    if let Some(dup) = names.iter().duplicates().next() {
296        sql_bail!("column {} specified more than once", dup.as_str().quoted());
297    }
298
299    // Build initial relation type that handles declared data types
300    // and NOT NULL constraints.
301    let mut column_types = Vec::with_capacity(columns.len());
302    let mut defaults = Vec::with_capacity(columns.len());
303    let mut changes = BTreeMap::new();
304    let mut keys = Vec::new();
305
306    for (i, c) in columns.into_iter().enumerate() {
307        let aug_data_type = &c.data_type;
308        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
309        let mut nullable = true;
310        let mut default = Expr::null();
311        let mut versioned = false;
312        for option in &c.options {
313            match &option.option {
314                ColumnOption::NotNull => nullable = false,
315                ColumnOption::Default(expr) => {
316                    // Ensure expression can be planned and yields the correct
317                    // type.
318                    let mut expr = expr.clone();
319                    transform_ast::transform(scx, &mut expr)?;
320                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
321                    default = expr.clone();
322                }
323                ColumnOption::Unique { is_primary } => {
324                    keys.push(vec![i]);
325                    if *is_primary {
326                        nullable = false;
327                    }
328                }
329                ColumnOption::Versioned { action, version } => {
330                    let version = RelationVersion::from(*version);
331                    versioned = true;
332
333                    let name = normalize::column_name(c.name.clone());
334                    let typ = ty.clone().nullable(nullable);
335
336                    changes.insert(version, (action.clone(), name, typ));
337                }
338                other => {
339                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
340                }
341            }
342        }
343        // TODO(alter_table): This assumes all versioned columns are at the
344        // end. This will no longer be true when we support dropping columns.
345        if !versioned {
346            column_types.push(ty.nullable(nullable));
347        }
348        defaults.push(default);
349    }
350
351    let mut seen_primary = false;
352    'c: for constraint in constraints {
353        match constraint {
354            TableConstraint::Unique {
355                name: _,
356                columns,
357                is_primary,
358                nulls_not_distinct,
359            } => {
360                if seen_primary && *is_primary {
361                    sql_bail!(
362                        "multiple primary keys for table {} are not allowed",
363                        name.to_ast_string_stable()
364                    );
365                }
366                seen_primary = *is_primary || seen_primary;
367
368                let mut key = vec![];
369                for column in columns {
370                    let column = normalize::column_name(column.clone());
371                    match names.iter().position(|name| *name == column) {
372                        None => sql_bail!("unknown column in constraint: {}", column),
373                        Some(i) => {
374                            let nullable = &mut column_types[i].nullable;
375                            if *is_primary {
376                                if *nulls_not_distinct {
377                                    sql_bail!(
378                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
379                                    );
380                                }
381
382                                *nullable = false;
383                            } else if !(*nulls_not_distinct || !*nullable) {
384                                // Non-primary key unique constraints are only keys if all of their
385                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
386                                break 'c;
387                            }
388
389                            key.push(i);
390                        }
391                    }
392                }
393
394                if *is_primary {
395                    keys.insert(0, key);
396                } else {
397                    keys.push(key);
398                }
399            }
400            TableConstraint::ForeignKey { .. } => {
401                // Foreign key constraints are not presently enforced. We allow
402                // them with feature flags for sqllogictest's sake.
403                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
404            }
405            TableConstraint::Check { .. } => {
406                // Check constraints are not presently enforced. We allow them
407                // with feature flags for sqllogictest's sake.
408                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
409            }
410        }
411    }
412
413    if !keys.is_empty() {
414        // Unique constraints are not presently enforced. We allow them with feature flags for
415        // sqllogictest's sake.
416        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
417    }
418
419    let typ = RelationType::new(column_types).with_keys(keys);
420
421    let temporary = *temporary;
422    let name = if temporary {
423        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
424    } else {
425        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
426    };
427
428    // Check for an object in the catalog with this same name
429    let full_name = scx.catalog.resolve_full_name(&name);
430    let partial_name = PartialItemName::from(full_name.clone());
431    // For PostgreSQL compatibility, we need to prevent creating tables when
432    // there is an existing object *or* type of the same name.
433    if let (false, Ok(item)) = (
434        if_not_exists,
435        scx.catalog.resolve_item_or_type(&partial_name),
436    ) {
437        return Err(PlanError::ItemAlreadyExists {
438            name: full_name.to_string(),
439            item_type: item.item_type(),
440        });
441    }
442
443    let desc = RelationDesc::new(typ, names);
444    let mut desc = VersionedRelationDesc::new(desc);
445    for (version, (_action, name, typ)) in changes.into_iter() {
446        let new_version = desc.add_column(name, typ);
447        if version != new_version {
448            return Err(PlanError::InvalidTable {
449                name: full_name.item,
450            });
451        }
452    }
453
454    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
455
456    // Table options should only consider the original columns, since those
457    // were the only ones in scope when the table was created.
458    //
459    // TODO(alter_table): Will need to reconsider this when we support ALTERing
460    // the PARTITION BY columns.
461    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
462    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
463
464    let compaction_window = options.iter().find_map(|o| {
465        #[allow(irrefutable_let_patterns)]
466        if let crate::plan::TableOption::RetainHistory(lcw) = o {
467            Some(lcw.clone())
468        } else {
469            None
470        }
471    });
472
473    let table = Table {
474        create_sql,
475        desc,
476        temporary,
477        compaction_window,
478        data_source: TableDataSource::TableWrites { defaults },
479    };
480    Ok(Plan::CreateTable(CreateTablePlan {
481        name,
482        table,
483        if_not_exists: *if_not_exists,
484    }))
485}
486
487pub fn describe_create_table_from_source(
488    _: &StatementContext,
489    _: CreateTableFromSourceStatement<Aug>,
490) -> Result<StatementDesc, PlanError> {
491    Ok(StatementDesc::new(None))
492}
493
494pub fn describe_create_webhook_source(
495    _: &StatementContext,
496    _: CreateWebhookSourceStatement<Aug>,
497) -> Result<StatementDesc, PlanError> {
498    Ok(StatementDesc::new(None))
499}
500
501pub fn describe_create_source(
502    _: &StatementContext,
503    _: CreateSourceStatement<Aug>,
504) -> Result<StatementDesc, PlanError> {
505    Ok(StatementDesc::new(None))
506}
507
508pub fn describe_create_subsource(
509    _: &StatementContext,
510    _: CreateSubsourceStatement<Aug>,
511) -> Result<StatementDesc, PlanError> {
512    Ok(StatementDesc::new(None))
513}
514
515generate_extracted_config!(
516    CreateSourceOption,
517    (TimestampInterval, Duration),
518    (RetainHistory, OptionalDuration)
519);
520
521generate_extracted_config!(
522    PgConfigOption,
523    (Details, String),
524    (Publication, String),
525    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
526);
527
528generate_extracted_config!(
529    MySqlConfigOption,
530    (Details, String),
531    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
532    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
533);
534
535generate_extracted_config!(
536    SqlServerConfigOption,
537    (Details, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542pub fn plan_create_webhook_source(
543    scx: &StatementContext,
544    mut stmt: CreateWebhookSourceStatement<Aug>,
545) -> Result<Plan, PlanError> {
546    if stmt.is_table {
547        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
548    }
549
550    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
551    // we plan to normalize when we canonicalize the create statement.
552    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
553    if in_cluster.replica_ids().len() > 1 {
554        sql_bail!("cannot create webhook source in cluster with more than one replica")
555    }
556    let create_sql =
557        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
558
559    let CreateWebhookSourceStatement {
560        name,
561        if_not_exists,
562        body_format,
563        include_headers,
564        validate_using,
565        is_table,
566        // We resolved `in_cluster` above, so we want to ignore it here.
567        in_cluster: _,
568    } = stmt;
569
570    let validate_using = validate_using
571        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
572        .transpose()?;
573    if let Some(WebhookValidation { expression, .. }) = &validate_using {
574        // If the validation expression doesn't reference any part of the request, then we should
575        // return an error because it's almost definitely wrong.
576        if !expression.contains_column() {
577            return Err(PlanError::WebhookValidationDoesNotUseColumns);
578        }
579        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
580        // allow calls to `now()` because some webhook providers recommend rejecting requests that
581        // are older than a certain threshold.
582        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
583            return Err(PlanError::WebhookValidationNonDeterministic);
584        }
585    }
586
587    let body_format = match body_format {
588        Format::Bytes => WebhookBodyFormat::Bytes,
589        Format::Json { array } => WebhookBodyFormat::Json { array },
590        Format::Text => WebhookBodyFormat::Text,
591        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
592        ty => {
593            return Err(PlanError::Unsupported {
594                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
595                discussion_no: None,
596            });
597        }
598    };
599
600    let mut column_ty = vec![
601        // Always include the body of the request as the first column.
602        ColumnType {
603            scalar_type: ScalarType::from(body_format),
604            nullable: false,
605        },
606    ];
607    let mut column_names = vec!["body".to_string()];
608
609    let mut headers = WebhookHeaders::default();
610
611    // Include a `headers` column, possibly filtered.
612    if let Some(filters) = include_headers.column {
613        column_ty.push(ColumnType {
614            scalar_type: ScalarType::Map {
615                value_type: Box::new(ScalarType::String),
616                custom_id: None,
617            },
618            nullable: false,
619        });
620        column_names.push("headers".to_string());
621
622        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
623            filters.into_iter().partition_map(|filter| {
624                if filter.block {
625                    itertools::Either::Right(filter.header_name)
626                } else {
627                    itertools::Either::Left(filter.header_name)
628                }
629            });
630        headers.header_column = Some(WebhookHeaderFilters { allow, block });
631    }
632
633    // Map headers to specific columns.
634    for header in include_headers.mappings {
635        let scalar_type = header
636            .use_bytes
637            .then_some(ScalarType::Bytes)
638            .unwrap_or(ScalarType::String);
639        column_ty.push(ColumnType {
640            scalar_type,
641            nullable: true,
642        });
643        column_names.push(header.column_name.into_string());
644
645        let column_idx = column_ty.len() - 1;
646        // Double check we're consistent with column names.
647        assert_eq!(
648            column_idx,
649            column_names.len() - 1,
650            "header column names and types don't match"
651        );
652        headers
653            .mapped_headers
654            .insert(column_idx, (header.header_name, header.use_bytes));
655    }
656
657    // Validate our columns.
658    let mut unique_check = HashSet::with_capacity(column_names.len());
659    for name in &column_names {
660        if !unique_check.insert(name) {
661            return Err(PlanError::AmbiguousColumn(name.clone().into()));
662        }
663    }
664    if column_names.len() > MAX_NUM_COLUMNS {
665        return Err(PlanError::TooManyColumns {
666            max_num_columns: MAX_NUM_COLUMNS,
667            req_num_columns: column_names.len(),
668        });
669    }
670
671    let typ = RelationType::new(column_ty);
672    let desc = RelationDesc::new(typ, column_names);
673
674    // Check for an object in the catalog with this same name
675    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
676    let full_name = scx.catalog.resolve_full_name(&name);
677    let partial_name = PartialItemName::from(full_name.clone());
678    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
679        return Err(PlanError::ItemAlreadyExists {
680            name: full_name.to_string(),
681            item_type: item.item_type(),
682        });
683    }
684
685    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
686    // such, we always use a default of EpochMilliseconds.
687    let timeline = Timeline::EpochMilliseconds;
688
689    let plan = if is_table {
690        let data_source = DataSourceDesc::Webhook {
691            validate_using,
692            body_format,
693            headers,
694            cluster_id: Some(in_cluster.id()),
695        };
696        let data_source = TableDataSource::DataSource {
697            desc: data_source,
698            timeline,
699        };
700        Plan::CreateTable(CreateTablePlan {
701            name,
702            if_not_exists,
703            table: Table {
704                create_sql,
705                desc: VersionedRelationDesc::new(desc),
706                temporary: false,
707                compaction_window: None,
708                data_source,
709            },
710        })
711    } else {
712        let data_source = DataSourceDesc::Webhook {
713            validate_using,
714            body_format,
715            headers,
716            // Important: The cluster is set at the `Source` level.
717            cluster_id: None,
718        };
719        Plan::CreateSource(CreateSourcePlan {
720            name,
721            source: Source {
722                create_sql,
723                data_source,
724                desc,
725                compaction_window: None,
726            },
727            if_not_exists,
728            timeline,
729            in_cluster: Some(in_cluster.id()),
730        })
731    };
732
733    Ok(plan)
734}
735
736pub fn plan_create_source(
737    scx: &StatementContext,
738    mut stmt: CreateSourceStatement<Aug>,
739) -> Result<Plan, PlanError> {
740    let CreateSourceStatement {
741        name,
742        in_cluster: _,
743        col_names,
744        connection: source_connection,
745        envelope,
746        if_not_exists,
747        format,
748        key_constraint,
749        include_metadata,
750        with_options,
751        external_references: referenced_subsources,
752        progress_subsource,
753    } = &stmt;
754
755    mz_ore::soft_assert_or_log!(
756        referenced_subsources.is_none(),
757        "referenced subsources must be cleared in purification"
758    );
759
760    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
761        && scx.catalog.system_vars().force_source_table_syntax();
762
763    // If the new source table syntax is forced all the options related to the primary
764    // source output should be un-set.
765    if force_source_table_syntax {
766        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
767            Err(PlanError::UseTablesForSources(
768                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
769            ))?;
770        }
771    }
772
773    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
774
775    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
776        && include_metadata
777            .iter()
778            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
779    {
780        // TODO(guswynn): should this be `bail_unsupported!`?
781        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
782    }
783    if !matches!(
784        source_connection,
785        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
786    ) && !include_metadata.is_empty()
787    {
788        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
789    }
790
791    let external_connection = match source_connection {
792        CreateSourceConnection::Kafka {
793            connection: connection_name,
794            options,
795        } => {
796            let connection_item = scx.get_item_by_resolved_name(connection_name)?;
797            if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
798                sql_bail!(
799                    "{} is not a kafka connection",
800                    scx.catalog.resolve_full_name(connection_item.name())
801                )
802            }
803
804            let KafkaSourceConfigOptionExtracted {
805                group_id_prefix,
806                topic,
807                topic_metadata_refresh_interval,
808                start_timestamp: _, // purified into `start_offset`
809                start_offset,
810                seen: _,
811            }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
812
813            let topic = topic.expect("validated exists during purification");
814
815            let mut start_offsets = BTreeMap::new();
816            if let Some(offsets) = start_offset {
817                for (part, offset) in offsets.iter().enumerate() {
818                    if *offset < 0 {
819                        sql_bail!("START OFFSET must be a nonnegative integer");
820                    }
821                    start_offsets.insert(i32::try_from(part)?, *offset);
822                }
823            }
824
825            if !start_offsets.is_empty() && envelope.requires_all_input() {
826                sql_bail!("START OFFSET is not supported with ENVELOPE {}", envelope)
827            }
828
829            if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
830                // This is a librdkafka-enforced restriction that, if violated,
831                // would result in a runtime error for the source.
832                sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
833            }
834
835            if !include_metadata.is_empty()
836                && !matches!(
837                    envelope,
838                    ast::SourceEnvelope::Upsert { .. }
839                        | ast::SourceEnvelope::None
840                        | ast::SourceEnvelope::Debezium
841                )
842            {
843                // TODO(guswynn): should this be `bail_unsupported!`?
844                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
845            }
846
847            // This defines the metadata columns for the primary export of this kafka source,
848            // not any tables that are created from it.
849            // TODO: Remove this when we stop outputting to the primary export of a source.
850            let metadata_columns = include_metadata
851                .into_iter()
852                .flat_map(|item| match item {
853                    SourceIncludeMetadata::Timestamp { alias } => {
854                        let name = match alias {
855                            Some(name) => name.to_string(),
856                            None => "timestamp".to_owned(),
857                        };
858                        Some((name, KafkaMetadataKind::Timestamp))
859                    }
860                    SourceIncludeMetadata::Partition { alias } => {
861                        let name = match alias {
862                            Some(name) => name.to_string(),
863                            None => "partition".to_owned(),
864                        };
865                        Some((name, KafkaMetadataKind::Partition))
866                    }
867                    SourceIncludeMetadata::Offset { alias } => {
868                        let name = match alias {
869                            Some(name) => name.to_string(),
870                            None => "offset".to_owned(),
871                        };
872                        Some((name, KafkaMetadataKind::Offset))
873                    }
874                    SourceIncludeMetadata::Headers { alias } => {
875                        let name = match alias {
876                            Some(name) => name.to_string(),
877                            None => "headers".to_owned(),
878                        };
879                        Some((name, KafkaMetadataKind::Headers))
880                    }
881                    SourceIncludeMetadata::Header {
882                        alias,
883                        key,
884                        use_bytes,
885                    } => Some((
886                        alias.to_string(),
887                        KafkaMetadataKind::Header {
888                            key: key.clone(),
889                            use_bytes: *use_bytes,
890                        },
891                    )),
892                    SourceIncludeMetadata::Key { .. } => {
893                        // handled below
894                        None
895                    }
896                })
897                .collect();
898
899            let connection = KafkaSourceConnection::<ReferencedConnection> {
900                connection: connection_item.id(),
901                connection_id: connection_item.id(),
902                topic,
903                start_offsets,
904                group_id_prefix,
905                topic_metadata_refresh_interval,
906                metadata_columns,
907            };
908
909            GenericSourceConnection::Kafka(connection)
910        }
911        CreateSourceConnection::Postgres {
912            connection,
913            options,
914        }
915        | CreateSourceConnection::Yugabyte {
916            connection,
917            options,
918        } => {
919            let (source_flavor, flavor_name) = match source_connection {
920                CreateSourceConnection::Postgres { .. } => (PostgresFlavor::Vanilla, "PostgreSQL"),
921                CreateSourceConnection::Yugabyte { .. } => (PostgresFlavor::Yugabyte, "Yugabyte"),
922                _ => unreachable!(),
923            };
924
925            let connection_item = scx.get_item_by_resolved_name(connection)?;
926            let connection_mismatch = match connection_item.connection()? {
927                Connection::Postgres(conn) => source_flavor != conn.flavor,
928                _ => true,
929            };
930            if connection_mismatch {
931                sql_bail!(
932                    "{} is not a {flavor_name} connection",
933                    scx.catalog.resolve_full_name(connection_item.name())
934                )
935            }
936
937            let PgConfigOptionExtracted {
938                details,
939                publication,
940                // text columns are already part of the source-exports and are only included
941                // in these options for round-tripping of a `CREATE SOURCE` statement. This should
942                // be removed once we drop support for implicitly created subsources.
943                text_columns: _,
944                seen: _,
945            } = options.clone().try_into()?;
946
947            let details = details
948                .as_ref()
949                .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
950            let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
951            let details = ProtoPostgresSourcePublicationDetails::decode(&*details)
952                .map_err(|e| sql_err!("{}", e))?;
953
954            let publication_details = PostgresSourcePublicationDetails::from_proto(details)
955                .map_err(|e| sql_err!("{}", e))?;
956
957            let connection =
958                GenericSourceConnection::<ReferencedConnection>::from(PostgresSourceConnection {
959                    connection: connection_item.id(),
960                    connection_id: connection_item.id(),
961                    publication: publication.expect("validated exists during purification"),
962                    publication_details,
963                });
964
965            connection
966        }
967        CreateSourceConnection::SqlServer {
968            connection,
969            options: _,
970        } => {
971            let connection_item = scx.get_item_by_resolved_name(connection)?;
972            match connection_item.connection()? {
973                Connection::SqlServer(connection) => connection,
974                _ => sql_bail!(
975                    "{} is not a SQL Server connection",
976                    scx.catalog.resolve_full_name(connection_item.name())
977                ),
978            };
979            // TODO(sql_server2): Handle SQL Server connection options.
980
981            let extras = SqlServerSourceExtras {};
982            let connection =
983                GenericSourceConnection::<ReferencedConnection>::from(SqlServerSource {
984                    catalog_id: connection_item.id(),
985                    connection: connection_item.id(),
986                    extras,
987                });
988
989            connection
990        }
991        CreateSourceConnection::MySql {
992            connection,
993            options,
994        } => {
995            let connection_item = scx.get_item_by_resolved_name(connection)?;
996            match connection_item.connection()? {
997                Connection::MySql(connection) => connection,
998                _ => sql_bail!(
999                    "{} is not a MySQL connection",
1000                    scx.catalog.resolve_full_name(connection_item.name())
1001                ),
1002            };
1003            let MySqlConfigOptionExtracted {
1004                details,
1005                // text/exclude columns are already part of the source-exports and are only included
1006                // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1007                // be removed once we drop support for implicitly created subsources.
1008                text_columns: _,
1009                exclude_columns: _,
1010                seen: _,
1011            } = options.clone().try_into()?;
1012
1013            let details = details
1014                .as_ref()
1015                .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1016            let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1017            let details =
1018                ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1019            let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1020
1021            let connection =
1022                GenericSourceConnection::<ReferencedConnection>::from(MySqlSourceConnection {
1023                    connection: connection_item.id(),
1024                    connection_id: connection_item.id(),
1025                    details,
1026                });
1027
1028            connection
1029        }
1030        CreateSourceConnection::LoadGenerator { generator, options } => {
1031            let load_generator =
1032                load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1033
1034            let LoadGeneratorOptionExtracted {
1035                tick_interval,
1036                as_of,
1037                up_to,
1038                ..
1039            } = options.clone().try_into()?;
1040            let tick_micros = match tick_interval {
1041                Some(interval) => Some(interval.as_micros().try_into()?),
1042                None => None,
1043            };
1044
1045            if up_to < as_of {
1046                sql_bail!("UP TO cannot be less than AS OF");
1047            }
1048
1049            let connection = GenericSourceConnection::from(LoadGeneratorSourceConnection {
1050                load_generator,
1051                tick_micros,
1052                as_of,
1053                up_to,
1054            });
1055
1056            connection
1057        }
1058    };
1059
1060    let CreateSourceOptionExtracted {
1061        timestamp_interval,
1062        retain_history,
1063        seen: _,
1064    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
1065
1066    let metadata_columns_desc = match external_connection {
1067        GenericSourceConnection::Kafka(KafkaSourceConnection {
1068            ref metadata_columns,
1069            ..
1070        }) => kafka_metadata_columns_desc(metadata_columns),
1071        _ => vec![],
1072    };
1073
1074    // Generate the relation description for the primary export of the source.
1075    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1076        scx,
1077        &envelope,
1078        format,
1079        Some(external_connection.default_key_desc()),
1080        external_connection.default_value_desc(),
1081        include_metadata,
1082        metadata_columns_desc,
1083        &external_connection,
1084    )?;
1085    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
1086
1087    let names: Vec<_> = desc.iter_names().cloned().collect();
1088    if let Some(dup) = names.iter().duplicates().next() {
1089        sql_bail!("column {} specified more than once", dup.as_str().quoted());
1090    }
1091
1092    // Apply user-specified key constraint
1093    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
1094        // Don't remove this without addressing
1095        // https://github.com/MaterializeInc/database-issues/issues/4371.
1096        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
1097
1098        let key_columns = columns
1099            .into_iter()
1100            .map(normalize::column_name)
1101            .collect::<Vec<_>>();
1102
1103        let mut uniq = BTreeSet::new();
1104        for col in key_columns.iter() {
1105            if !uniq.insert(col) {
1106                sql_bail!("Repeated column name in source key constraint: {}", col);
1107            }
1108        }
1109
1110        let key_indices = key_columns
1111            .iter()
1112            .map(|col| {
1113                let name_idx = desc
1114                    .get_by_name(col)
1115                    .map(|(idx, _type)| idx)
1116                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
1117                if desc.get_unambiguous_name(name_idx).is_none() {
1118                    sql_bail!("Ambiguous column in source key constraint: {}", col);
1119                }
1120                Ok(name_idx)
1121            })
1122            .collect::<Result<Vec<_>, _>>()?;
1123
1124        if !desc.typ().keys.is_empty() {
1125            return Err(key_constraint_err(&desc, &key_columns));
1126        } else {
1127            desc = desc.with_key(key_indices);
1128        }
1129    }
1130
1131    let timestamp_interval = match timestamp_interval {
1132        Some(duration) => {
1133            let min = scx.catalog.system_vars().min_timestamp_interval();
1134            let max = scx.catalog.system_vars().max_timestamp_interval();
1135            if duration < min || duration > max {
1136                return Err(PlanError::InvalidTimestampInterval {
1137                    min,
1138                    max,
1139                    requested: duration,
1140                });
1141            }
1142            duration
1143        }
1144        None => scx.catalog.config().timestamp_interval,
1145    };
1146
1147    let (primary_export, primary_export_details) = if force_source_table_syntax {
1148        (
1149            SourceExportDataConfig {
1150                encoding: None,
1151                envelope: SourceEnvelope::None(NoneEnvelope {
1152                    key_envelope: KeyEnvelope::None,
1153                    key_arity: 0,
1154                }),
1155            },
1156            SourceExportDetails::None,
1157        )
1158    } else {
1159        (
1160            SourceExportDataConfig {
1161                encoding,
1162                envelope: envelope.clone(),
1163            },
1164            external_connection.primary_export_details(),
1165        )
1166    };
1167    let source_desc = SourceDesc::<ReferencedConnection> {
1168        connection: external_connection,
1169        // We only define primary-export details for this source if we are still supporting
1170        // the legacy source syntax. Otherwise, we will not output to the primary collection.
1171        // TODO(database-issues#8620): Remove this field once the new syntax is enabled everywhere
1172        primary_export,
1173        primary_export_details,
1174        timestamp_interval,
1175    };
1176
1177    let progress_subsource = match progress_subsource {
1178        Some(name) => match name {
1179            DeferredItemName::Named(name) => match name {
1180                ResolvedItemName::Item { id, .. } => *id,
1181                ResolvedItemName::Cte { .. }
1182                | ResolvedItemName::ContinualTask { .. }
1183                | ResolvedItemName::Error => {
1184                    sql_bail!("[internal error] invalid target id")
1185                }
1186            },
1187            DeferredItemName::Deferred(_) => {
1188                sql_bail!("[internal error] progress subsource must be named during purification")
1189            }
1190        },
1191        _ => sql_bail!("[internal error] progress subsource must be named during purification"),
1192    };
1193
1194    let if_not_exists = *if_not_exists;
1195    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1196
1197    // Check for an object in the catalog with this same name
1198    let full_name = scx.catalog.resolve_full_name(&name);
1199    let partial_name = PartialItemName::from(full_name.clone());
1200    // For PostgreSQL compatibility, we need to prevent creating sources when
1201    // there is an existing object *or* type of the same name.
1202    if let (false, Ok(item)) = (
1203        if_not_exists,
1204        scx.catalog.resolve_item_or_type(&partial_name),
1205    ) {
1206        return Err(PlanError::ItemAlreadyExists {
1207            name: full_name.to_string(),
1208            item_type: item.item_type(),
1209        });
1210    }
1211
1212    // We will rewrite the cluster if one is not provided, so we must use the
1213    // `in_cluster` value we plan to normalize when we canonicalize the create
1214    // statement.
1215    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
1216
1217    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
1218
1219    // Determine a default timeline for the source.
1220    let timeline = match envelope {
1221        SourceEnvelope::CdcV2 => {
1222            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1223        }
1224        _ => Timeline::EpochMilliseconds,
1225    };
1226
1227    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1228    let source = Source {
1229        create_sql,
1230        data_source: DataSourceDesc::Ingestion(Ingestion {
1231            desc: source_desc,
1232            progress_subsource,
1233        }),
1234        desc,
1235        compaction_window,
1236    };
1237
1238    Ok(Plan::CreateSource(CreateSourcePlan {
1239        name,
1240        source,
1241        if_not_exists,
1242        timeline,
1243        in_cluster: Some(in_cluster.id()),
1244    }))
1245}
1246
1247fn apply_source_envelope_encoding(
1248    scx: &StatementContext,
1249    envelope: &ast::SourceEnvelope,
1250    format: &Option<FormatSpecifier<Aug>>,
1251    key_desc: Option<RelationDesc>,
1252    value_desc: RelationDesc,
1253    include_metadata: &[SourceIncludeMetadata],
1254    metadata_columns_desc: Vec<(&str, ColumnType)>,
1255    source_connection: &GenericSourceConnection<ReferencedConnection>,
1256) -> Result<
1257    (
1258        RelationDesc,
1259        SourceEnvelope,
1260        Option<SourceDataEncoding<ReferencedConnection>>,
1261    ),
1262    PlanError,
1263> {
1264    let encoding = match format {
1265        Some(format) => Some(get_encoding(scx, format, envelope)?),
1266        None => None,
1267    };
1268
1269    let (key_desc, value_desc) = match &encoding {
1270        Some(encoding) => {
1271            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1272            // single column of type bytes.
1273            match value_desc.typ().columns() {
1274                [typ] => match typ.scalar_type {
1275                    ScalarType::Bytes => {}
1276                    _ => sql_bail!(
1277                        "The schema produced by the source is incompatible with format decoding"
1278                    ),
1279                },
1280                _ => sql_bail!(
1281                    "The schema produced by the source is incompatible with format decoding"
1282                ),
1283            }
1284
1285            let (key_desc, value_desc) = encoding.desc()?;
1286
1287            // TODO(petrosagg): This piece of code seems to be making a statement about the
1288            // nullability of the NONE envelope when the source is Kafka. As written, the code
1289            // misses opportunities to mark columns as not nullable and is over conservative. For
1290            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1291            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1292            // should be replaced with precise type-level reasoning.
1293            let key_desc = key_desc.map(|desc| {
1294                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1295                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1296                if is_kafka && is_envelope_none {
1297                    RelationDesc::from_names_and_types(
1298                        desc.into_iter()
1299                            .map(|(name, typ)| (name, typ.nullable(true))),
1300                    )
1301                } else {
1302                    desc
1303                }
1304            });
1305            (key_desc, value_desc)
1306        }
1307        None => (key_desc, value_desc),
1308    };
1309
1310    // KEY VALUE load generators are the only UPSERT source that
1311    // has no encoding but defaults to `INCLUDE KEY`.
1312    //
1313    // As discussed
1314    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1315    // removing this special case amounts to deciding how to handle null keys
1316    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1317    // this special case in.
1318    //
1319    // Note that this is safe because this generator is
1320    // 1. The only source with no encoding that can have its key included.
1321    // 2. Never produces null keys (or values, for that matter).
1322    let key_envelope_no_encoding = matches!(
1323        source_connection,
1324        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1325            load_generator: LoadGenerator::KeyValue(_),
1326            ..
1327        })
1328    );
1329    let mut key_envelope = get_key_envelope(
1330        include_metadata,
1331        encoding.as_ref(),
1332        key_envelope_no_encoding,
1333    )?;
1334
1335    match (&envelope, &key_envelope) {
1336        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1337        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1338            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1339        ),
1340        _ => {}
1341    };
1342
1343    // Not all source envelopes are compatible with all source connections.
1344    // Whoever constructs the source ingestion pipeline is responsible for
1345    // choosing compatible envelopes and connections.
1346    //
1347    // TODO(guswynn): ambiguously assert which connections and envelopes are
1348    // compatible in typechecking
1349    //
1350    // TODO: remove bails as more support for upsert is added.
1351    let envelope = match &envelope {
1352        // TODO: fixup key envelope
1353        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1354        ast::SourceEnvelope::Debezium => {
1355            //TODO check that key envelope is not set
1356            let after_idx = match typecheck_debezium(&value_desc) {
1357                Ok((_before_idx, after_idx)) => Ok(after_idx),
1358                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1359                    Some(DataEncoding::Avro(_)) => Err(type_err),
1360                    _ => Err(sql_err!(
1361                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1362                    )),
1363                },
1364            }?;
1365
1366            UnplannedSourceEnvelope::Upsert {
1367                style: UpsertStyle::Debezium { after_idx },
1368            }
1369        }
1370        ast::SourceEnvelope::Upsert {
1371            value_decode_err_policy,
1372        } => {
1373            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1374                None => {
1375                    if !key_envelope_no_encoding {
1376                        bail_unsupported!(format!(
1377                            "UPSERT requires a key/value format: {:?}",
1378                            format
1379                        ))
1380                    }
1381                    None
1382                }
1383                Some(key_encoding) => Some(key_encoding),
1384            };
1385            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1386            // specified.
1387            if key_envelope == KeyEnvelope::None {
1388                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1389            }
1390            // If the value decode error policy is not set we use the default upsert style.
1391            let style = match value_decode_err_policy.as_slice() {
1392                [] => UpsertStyle::Default(key_envelope),
1393                [SourceErrorPolicy::Inline { alias }] => {
1394                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1395                    UpsertStyle::ValueErrInline {
1396                        key_envelope,
1397                        error_column: alias
1398                            .as_ref()
1399                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1400                    }
1401                }
1402                _ => {
1403                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1404                }
1405            };
1406
1407            UnplannedSourceEnvelope::Upsert { style }
1408        }
1409        ast::SourceEnvelope::CdcV2 => {
1410            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1411            //TODO check that key envelope is not set
1412            match format {
1413                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1414                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1415            }
1416            UnplannedSourceEnvelope::CdcV2
1417        }
1418    };
1419
1420    let metadata_desc = included_column_desc(metadata_columns_desc);
1421    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1422
1423    Ok((desc, envelope, encoding))
1424}
1425
1426/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1427/// of columns and constraints.
1428fn plan_source_export_desc(
1429    scx: &StatementContext,
1430    name: &UnresolvedItemName,
1431    columns: &Vec<ColumnDef<Aug>>,
1432    constraints: &Vec<TableConstraint<Aug>>,
1433) -> Result<RelationDesc, PlanError> {
1434    let names: Vec<_> = columns
1435        .iter()
1436        .map(|c| normalize::column_name(c.name.clone()))
1437        .collect();
1438
1439    if let Some(dup) = names.iter().duplicates().next() {
1440        sql_bail!("column {} specified more than once", dup.as_str().quoted());
1441    }
1442
1443    // Build initial relation type that handles declared data types
1444    // and NOT NULL constraints.
1445    let mut column_types = Vec::with_capacity(columns.len());
1446    let mut keys = Vec::new();
1447
1448    for (i, c) in columns.into_iter().enumerate() {
1449        let aug_data_type = &c.data_type;
1450        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1451        let mut nullable = true;
1452        for option in &c.options {
1453            match &option.option {
1454                ColumnOption::NotNull => nullable = false,
1455                ColumnOption::Default(_) => {
1456                    bail_unsupported!("Source export with default value")
1457                }
1458                ColumnOption::Unique { is_primary } => {
1459                    keys.push(vec![i]);
1460                    if *is_primary {
1461                        nullable = false;
1462                    }
1463                }
1464                other => {
1465                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1466                }
1467            }
1468        }
1469        column_types.push(ty.nullable(nullable));
1470    }
1471
1472    let mut seen_primary = false;
1473    'c: for constraint in constraints {
1474        match constraint {
1475            TableConstraint::Unique {
1476                name: _,
1477                columns,
1478                is_primary,
1479                nulls_not_distinct,
1480            } => {
1481                if seen_primary && *is_primary {
1482                    sql_bail!(
1483                        "multiple primary keys for source export {} are not allowed",
1484                        name.to_ast_string_stable()
1485                    );
1486                }
1487                seen_primary = *is_primary || seen_primary;
1488
1489                let mut key = vec![];
1490                for column in columns {
1491                    let column = normalize::column_name(column.clone());
1492                    match names.iter().position(|name| *name == column) {
1493                        None => sql_bail!("unknown column in constraint: {}", column),
1494                        Some(i) => {
1495                            let nullable = &mut column_types[i].nullable;
1496                            if *is_primary {
1497                                if *nulls_not_distinct {
1498                                    sql_bail!(
1499                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1500                                    );
1501                                }
1502                                *nullable = false;
1503                            } else if !(*nulls_not_distinct || !*nullable) {
1504                                // Non-primary key unique constraints are only keys if all of their
1505                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1506                                break 'c;
1507                            }
1508
1509                            key.push(i);
1510                        }
1511                    }
1512                }
1513
1514                if *is_primary {
1515                    keys.insert(0, key);
1516                } else {
1517                    keys.push(key);
1518                }
1519            }
1520            TableConstraint::ForeignKey { .. } => {
1521                bail_unsupported!("Source export with a foreign key")
1522            }
1523            TableConstraint::Check { .. } => {
1524                bail_unsupported!("Source export with a check constraint")
1525            }
1526        }
1527    }
1528
1529    let typ = RelationType::new(column_types).with_keys(keys);
1530    let desc = RelationDesc::new(typ, names);
1531    Ok(desc)
1532}
1533
1534generate_extracted_config!(
1535    CreateSubsourceOption,
1536    (Progress, bool, Default(false)),
1537    (ExternalReference, UnresolvedItemName),
1538    (TextColumns, Vec::<Ident>, Default(vec![])),
1539    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1540    (Details, String)
1541);
1542
1543pub fn plan_create_subsource(
1544    scx: &StatementContext,
1545    stmt: CreateSubsourceStatement<Aug>,
1546) -> Result<Plan, PlanError> {
1547    let CreateSubsourceStatement {
1548        name,
1549        columns,
1550        of_source,
1551        constraints,
1552        if_not_exists,
1553        with_options,
1554    } = &stmt;
1555
1556    let CreateSubsourceOptionExtracted {
1557        progress,
1558        external_reference,
1559        text_columns,
1560        exclude_columns,
1561        details,
1562        seen: _,
1563    } = with_options.clone().try_into()?;
1564
1565    // This invariant is enforced during purification; we are responsible for
1566    // creating the AST for subsources as a response to CREATE SOURCE
1567    // statements, so this would fire in integration testing if we failed to
1568    // uphold it.
1569    assert!(
1570        progress ^ (external_reference.is_some() && of_source.is_some()),
1571        "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1572    );
1573
1574    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1575
1576    let data_source = if let Some(source_reference) = of_source {
1577        // If the new source table syntax is forced we should not be creating any non-progress
1578        // subsources.
1579        if scx.catalog.system_vars().enable_create_table_from_source()
1580            && scx.catalog.system_vars().force_source_table_syntax()
1581        {
1582            Err(PlanError::UseTablesForSources(
1583                "CREATE SUBSOURCE".to_string(),
1584            ))?;
1585        }
1586
1587        // This is a subsource with the "natural" dependency order, i.e. it is
1588        // not a legacy subsource with the inverted structure.
1589        let ingestion_id = *source_reference.item_id();
1590        let external_reference = external_reference.unwrap();
1591
1592        // Decode the details option stored on the subsource statement, which contains information
1593        // created during the purification process.
1594        let details = details
1595            .as_ref()
1596            .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1597        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1598        let details =
1599            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1600        let details =
1601            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1602        let details = match details {
1603            SourceExportStatementDetails::Postgres { table } => {
1604                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1605                    column_casts: crate::pure::postgres::generate_column_casts(
1606                        scx,
1607                        &table,
1608                        &text_columns,
1609                    )?,
1610                    table,
1611                })
1612            }
1613            SourceExportStatementDetails::MySql {
1614                table,
1615                initial_gtid_set,
1616            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1617                table,
1618                initial_gtid_set,
1619                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1620                exclude_columns: exclude_columns
1621                    .into_iter()
1622                    .map(|c| c.into_string())
1623                    .collect(),
1624            }),
1625            SourceExportStatementDetails::SqlServer {
1626                table,
1627                capture_instance,
1628            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1629                capture_instance,
1630                table,
1631                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1632                exclude_columns: exclude_columns
1633                    .into_iter()
1634                    .map(|c| c.into_string())
1635                    .collect(),
1636            }),
1637            SourceExportStatementDetails::LoadGenerator { output } => {
1638                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1639            }
1640            SourceExportStatementDetails::Kafka {} => {
1641                bail_unsupported!("subsources cannot reference Kafka sources")
1642            }
1643        };
1644        DataSourceDesc::IngestionExport {
1645            ingestion_id,
1646            external_reference,
1647            details,
1648            // Subsources don't currently support non-default envelopes / encoding
1649            data_config: SourceExportDataConfig {
1650                envelope: SourceEnvelope::None(NoneEnvelope {
1651                    key_envelope: KeyEnvelope::None,
1652                    key_arity: 0,
1653                }),
1654                encoding: None,
1655            },
1656        }
1657    } else if progress {
1658        DataSourceDesc::Progress
1659    } else {
1660        panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1661    };
1662
1663    let if_not_exists = *if_not_exists;
1664    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1665
1666    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1667
1668    let source = Source {
1669        create_sql,
1670        data_source,
1671        desc,
1672        compaction_window: None,
1673    };
1674
1675    Ok(Plan::CreateSource(CreateSourcePlan {
1676        name,
1677        source,
1678        if_not_exists,
1679        timeline: Timeline::EpochMilliseconds,
1680        in_cluster: None,
1681    }))
1682}
1683
1684generate_extracted_config!(
1685    TableFromSourceOption,
1686    (TextColumns, Vec::<Ident>, Default(vec![])),
1687    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1688    (PartitionBy, Vec<Ident>),
1689    (Details, String)
1690);
1691
1692pub fn plan_create_table_from_source(
1693    scx: &StatementContext,
1694    stmt: CreateTableFromSourceStatement<Aug>,
1695) -> Result<Plan, PlanError> {
1696    if !scx.catalog.system_vars().enable_create_table_from_source() {
1697        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1698    }
1699
1700    let CreateTableFromSourceStatement {
1701        name,
1702        columns,
1703        constraints,
1704        if_not_exists,
1705        source,
1706        external_reference,
1707        envelope,
1708        format,
1709        include_metadata,
1710        with_options,
1711    } = &stmt;
1712
1713    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1714
1715    let TableFromSourceOptionExtracted {
1716        text_columns,
1717        exclude_columns,
1718        partition_by,
1719        details,
1720        seen: _,
1721    } = with_options.clone().try_into()?;
1722
1723    let source_item = scx.get_item_by_resolved_name(source)?;
1724    let ingestion_id = source_item.id();
1725
1726    // Decode the details option stored on the statement, which contains information
1727    // created during the purification process.
1728    let details = details
1729        .as_ref()
1730        .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1731    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1732    let details =
1733        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1734    let details =
1735        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1736
1737    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1738        && include_metadata
1739            .iter()
1740            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1741    {
1742        // TODO(guswynn): should this be `bail_unsupported!`?
1743        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1744    }
1745    if !matches!(
1746        details,
1747        SourceExportStatementDetails::Kafka { .. }
1748            | SourceExportStatementDetails::LoadGenerator { .. }
1749    ) && !include_metadata.is_empty()
1750    {
1751        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1752    }
1753
1754    let details = match details {
1755        SourceExportStatementDetails::Postgres { table } => {
1756            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1757                column_casts: crate::pure::postgres::generate_column_casts(
1758                    scx,
1759                    &table,
1760                    &text_columns,
1761                )?,
1762                table,
1763            })
1764        }
1765        SourceExportStatementDetails::MySql {
1766            table,
1767            initial_gtid_set,
1768        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1769            table,
1770            initial_gtid_set,
1771            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1772            exclude_columns: exclude_columns
1773                .into_iter()
1774                .map(|c| c.into_string())
1775                .collect(),
1776        }),
1777        SourceExportStatementDetails::SqlServer {
1778            table,
1779            capture_instance,
1780        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1781            table,
1782            capture_instance,
1783            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1784            exclude_columns: exclude_columns
1785                .into_iter()
1786                .map(|c| c.into_string())
1787                .collect(),
1788        }),
1789        SourceExportStatementDetails::LoadGenerator { output } => {
1790            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1791        }
1792        SourceExportStatementDetails::Kafka {} => {
1793            if !include_metadata.is_empty()
1794                && !matches!(
1795                    envelope,
1796                    ast::SourceEnvelope::Upsert { .. }
1797                        | ast::SourceEnvelope::None
1798                        | ast::SourceEnvelope::Debezium
1799                )
1800            {
1801                // TODO(guswynn): should this be `bail_unsupported!`?
1802                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1803            }
1804
1805            let metadata_columns = include_metadata
1806                .into_iter()
1807                .flat_map(|item| match item {
1808                    SourceIncludeMetadata::Timestamp { alias } => {
1809                        let name = match alias {
1810                            Some(name) => name.to_string(),
1811                            None => "timestamp".to_owned(),
1812                        };
1813                        Some((name, KafkaMetadataKind::Timestamp))
1814                    }
1815                    SourceIncludeMetadata::Partition { alias } => {
1816                        let name = match alias {
1817                            Some(name) => name.to_string(),
1818                            None => "partition".to_owned(),
1819                        };
1820                        Some((name, KafkaMetadataKind::Partition))
1821                    }
1822                    SourceIncludeMetadata::Offset { alias } => {
1823                        let name = match alias {
1824                            Some(name) => name.to_string(),
1825                            None => "offset".to_owned(),
1826                        };
1827                        Some((name, KafkaMetadataKind::Offset))
1828                    }
1829                    SourceIncludeMetadata::Headers { alias } => {
1830                        let name = match alias {
1831                            Some(name) => name.to_string(),
1832                            None => "headers".to_owned(),
1833                        };
1834                        Some((name, KafkaMetadataKind::Headers))
1835                    }
1836                    SourceIncludeMetadata::Header {
1837                        alias,
1838                        key,
1839                        use_bytes,
1840                    } => Some((
1841                        alias.to_string(),
1842                        KafkaMetadataKind::Header {
1843                            key: key.clone(),
1844                            use_bytes: *use_bytes,
1845                        },
1846                    )),
1847                    SourceIncludeMetadata::Key { .. } => {
1848                        // handled below
1849                        None
1850                    }
1851                })
1852                .collect();
1853
1854            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1855        }
1856    };
1857
1858    let source_connection = &source_item.source_desc()?.expect("is source").connection;
1859
1860    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1861    // during purification and define the `columns` and `constraints` fields for the statement,
1862    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1863    // we use the source connection's default schema.
1864    let (key_desc, value_desc) =
1865        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1866            let columns = match columns {
1867                TableFromSourceColumns::Defined(columns) => columns,
1868                _ => unreachable!(),
1869            };
1870            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1871            (None, desc)
1872        } else {
1873            let key_desc = source_connection.default_key_desc();
1874            let value_desc = source_connection.default_value_desc();
1875            (Some(key_desc), value_desc)
1876        };
1877
1878    let metadata_columns_desc = match &details {
1879        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1880            metadata_columns, ..
1881        }) => kafka_metadata_columns_desc(metadata_columns),
1882        _ => vec![],
1883    };
1884
1885    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1886        scx,
1887        &envelope,
1888        format,
1889        key_desc,
1890        value_desc,
1891        include_metadata,
1892        metadata_columns_desc,
1893        source_connection,
1894    )?;
1895    if let TableFromSourceColumns::Named(col_names) = columns {
1896        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1897    }
1898
1899    let names: Vec<_> = desc.iter_names().cloned().collect();
1900    if let Some(dup) = names.iter().duplicates().next() {
1901        sql_bail!("column {} specified more than once", dup.as_str().quoted());
1902    }
1903
1904    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1905
1906    // Allow users to specify a timeline. If they do not, determine a default
1907    // timeline for the source.
1908    let timeline = match envelope {
1909        SourceEnvelope::CdcV2 => {
1910            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1911        }
1912        _ => Timeline::EpochMilliseconds,
1913    };
1914
1915    if let Some(partition_by) = partition_by {
1916        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1917        check_partition_by(&desc, partition_by)?;
1918    }
1919
1920    let data_source = DataSourceDesc::IngestionExport {
1921        ingestion_id,
1922        external_reference: external_reference
1923            .as_ref()
1924            .expect("populated in purification")
1925            .clone(),
1926        details,
1927        data_config: SourceExportDataConfig { envelope, encoding },
1928    };
1929
1930    let if_not_exists = *if_not_exists;
1931
1932    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1933
1934    let table = Table {
1935        create_sql,
1936        desc: VersionedRelationDesc::new(desc),
1937        temporary: false,
1938        compaction_window: None,
1939        data_source: TableDataSource::DataSource {
1940            desc: data_source,
1941            timeline,
1942        },
1943    };
1944
1945    Ok(Plan::CreateTable(CreateTablePlan {
1946        name,
1947        table,
1948        if_not_exists,
1949    }))
1950}
1951
1952generate_extracted_config!(
1953    LoadGeneratorOption,
1954    (TickInterval, Duration),
1955    (AsOf, u64, Default(0_u64)),
1956    (UpTo, u64, Default(u64::MAX)),
1957    (ScaleFactor, f64),
1958    (MaxCardinality, u64),
1959    (Keys, u64),
1960    (SnapshotRounds, u64),
1961    (TransactionalSnapshot, bool),
1962    (ValueSize, u64),
1963    (Seed, u64),
1964    (Partitions, u64),
1965    (BatchSize, u64)
1966);
1967
1968impl LoadGeneratorOptionExtracted {
1969    pub(super) fn ensure_only_valid_options(
1970        &self,
1971        loadgen: &ast::LoadGenerator,
1972    ) -> Result<(), PlanError> {
1973        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
1974
1975        let mut options = self.seen.clone();
1976
1977        let permitted_options: &[_] = match loadgen {
1978            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
1979            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
1980            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
1981            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
1982            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
1983            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
1984            ast::LoadGenerator::KeyValue => &[
1985                TickInterval,
1986                Keys,
1987                SnapshotRounds,
1988                TransactionalSnapshot,
1989                ValueSize,
1990                Seed,
1991                Partitions,
1992                BatchSize,
1993            ],
1994        };
1995
1996        for o in permitted_options {
1997            options.remove(o);
1998        }
1999
2000        if !options.is_empty() {
2001            sql_bail!(
2002                "{} load generators do not support {} values",
2003                loadgen,
2004                options.iter().join(", ")
2005            )
2006        }
2007
2008        Ok(())
2009    }
2010}
2011
2012pub(crate) fn load_generator_ast_to_generator(
2013    scx: &StatementContext,
2014    loadgen: &ast::LoadGenerator,
2015    options: &[LoadGeneratorOption<Aug>],
2016    include_metadata: &[SourceIncludeMetadata],
2017) -> Result<LoadGenerator, PlanError> {
2018    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2019    extracted.ensure_only_valid_options(loadgen)?;
2020
2021    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2022        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2023    }
2024
2025    let load_generator = match loadgen {
2026        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2027        ast::LoadGenerator::Clock => {
2028            scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2029            LoadGenerator::Clock
2030        }
2031        ast::LoadGenerator::Counter => {
2032            let LoadGeneratorOptionExtracted {
2033                max_cardinality, ..
2034            } = extracted;
2035            LoadGenerator::Counter { max_cardinality }
2036        }
2037        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2038        ast::LoadGenerator::Datums => LoadGenerator::Datums,
2039        ast::LoadGenerator::Tpch => {
2040            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2041
2042            // Default to 0.01 scale factor (=10MB).
2043            let sf: f64 = scale_factor.unwrap_or(0.01);
2044            if !sf.is_finite() || sf < 0.0 {
2045                sql_bail!("unsupported scale factor {sf}");
2046            }
2047
2048            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2049                let total = (sf * multiplier).floor();
2050                let mut i = i64::try_cast_from(total)
2051                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2052                if i < 1 {
2053                    i = 1;
2054                }
2055                Ok(i)
2056            };
2057
2058            // The multiplications here are safely unchecked because they will
2059            // overflow to infinity, which will be caught by f64_to_i64.
2060            let count_supplier = f_to_i(10_000f64)?;
2061            let count_part = f_to_i(200_000f64)?;
2062            let count_customer = f_to_i(150_000f64)?;
2063            let count_orders = f_to_i(150_000f64 * 10f64)?;
2064            let count_clerk = f_to_i(1_000f64)?;
2065
2066            LoadGenerator::Tpch {
2067                count_supplier,
2068                count_part,
2069                count_customer,
2070                count_orders,
2071                count_clerk,
2072            }
2073        }
2074        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2075            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2076            let LoadGeneratorOptionExtracted {
2077                keys,
2078                snapshot_rounds,
2079                transactional_snapshot,
2080                value_size,
2081                tick_interval,
2082                seed,
2083                partitions,
2084                batch_size,
2085                ..
2086            } = extracted;
2087
2088            let mut include_offset = None;
2089            for im in include_metadata {
2090                match im {
2091                    SourceIncludeMetadata::Offset { alias } => {
2092                        include_offset = match alias {
2093                            Some(alias) => Some(alias.to_string()),
2094                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2095                        }
2096                    }
2097                    SourceIncludeMetadata::Key { .. } => continue,
2098
2099                    _ => {
2100                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2101                    }
2102                };
2103            }
2104
2105            let lgkv = KeyValueLoadGenerator {
2106                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2107                snapshot_rounds: snapshot_rounds
2108                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2109                // Defaults to true.
2110                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2111                value_size: value_size
2112                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2113                partitions: partitions
2114                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2115                tick_interval,
2116                batch_size: batch_size
2117                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2118                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2119                include_offset,
2120            };
2121
2122            if lgkv.keys == 0
2123                || lgkv.partitions == 0
2124                || lgkv.value_size == 0
2125                || lgkv.batch_size == 0
2126            {
2127                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2128            }
2129
2130            if lgkv.keys % lgkv.partitions != 0 {
2131                sql_bail!("KEYS must be a multiple of PARTITIONS")
2132            }
2133
2134            if lgkv.batch_size > lgkv.keys {
2135                sql_bail!("KEYS must be larger than BATCH SIZE")
2136            }
2137
2138            // This constraints simplifies the source implementation.
2139            // We can lift it later.
2140            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2141                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2142            }
2143
2144            if lgkv.snapshot_rounds == 0 {
2145                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2146            }
2147
2148            LoadGenerator::KeyValue(lgkv)
2149        }
2150    };
2151
2152    Ok(load_generator)
2153}
2154
2155fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2156    let before = value_desc.get_by_name(&"before".into());
2157    let (after_idx, after_ty) = value_desc
2158        .get_by_name(&"after".into())
2159        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2160    let before_idx = if let Some((before_idx, before_ty)) = before {
2161        if !matches!(before_ty.scalar_type, ScalarType::Record { .. }) {
2162            sql_bail!("'before' column must be of type record");
2163        }
2164        if before_ty != after_ty {
2165            sql_bail!("'before' type differs from 'after' column");
2166        }
2167        Some(before_idx)
2168    } else {
2169        None
2170    };
2171    Ok((before_idx, after_idx))
2172}
2173
2174fn get_encoding(
2175    scx: &StatementContext,
2176    format: &FormatSpecifier<Aug>,
2177    envelope: &ast::SourceEnvelope,
2178) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2179    let encoding = match format {
2180        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2181        FormatSpecifier::KeyValue { key, value } => {
2182            let key = {
2183                let encoding = get_encoding_inner(scx, key)?;
2184                Some(encoding.key.unwrap_or(encoding.value))
2185            };
2186            let value = get_encoding_inner(scx, value)?.value;
2187            SourceDataEncoding { key, value }
2188        }
2189    };
2190
2191    let requires_keyvalue = matches!(
2192        envelope,
2193        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2194    );
2195    let is_keyvalue = encoding.key.is_some();
2196    if requires_keyvalue && !is_keyvalue {
2197        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2198    };
2199
2200    Ok(encoding)
2201}
2202
2203/// Determine the cluster ID to use for this item.
2204///
2205/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2206/// Because of this, do not normalize/canonicalize the create SQL statement
2207/// until after calling this function.
2208fn source_sink_cluster_config<'a, 'ctx>(
2209    scx: &'a StatementContext<'ctx>,
2210    in_cluster: &mut Option<ResolvedClusterName>,
2211) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2212    let cluster = match in_cluster {
2213        None => {
2214            let cluster = scx.catalog.resolve_cluster(None)?;
2215            *in_cluster = Some(ResolvedClusterName {
2216                id: cluster.id(),
2217                print_name: None,
2218            });
2219            cluster
2220        }
2221        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2222    };
2223
2224    Ok(cluster)
2225}
2226
2227generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2228
2229#[derive(Debug)]
2230pub struct Schema {
2231    pub key_schema: Option<String>,
2232    pub value_schema: String,
2233    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2234    pub confluent_wire_format: bool,
2235}
2236
2237fn get_encoding_inner(
2238    scx: &StatementContext,
2239    format: &Format<Aug>,
2240) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2241    let value = match format {
2242        Format::Bytes => DataEncoding::Bytes,
2243        Format::Avro(schema) => {
2244            let Schema {
2245                key_schema,
2246                value_schema,
2247                csr_connection,
2248                confluent_wire_format,
2249            } = match schema {
2250                // TODO(jldlaughlin): we need a way to pass in primary key information
2251                // when building a source from a string or file.
2252                AvroSchema::InlineSchema {
2253                    schema: ast::Schema { schema },
2254                    with_options,
2255                } => {
2256                    let AvroSchemaOptionExtracted {
2257                        confluent_wire_format,
2258                        ..
2259                    } = with_options.clone().try_into()?;
2260
2261                    Schema {
2262                        key_schema: None,
2263                        value_schema: schema.clone(),
2264                        csr_connection: None,
2265                        confluent_wire_format,
2266                    }
2267                }
2268                AvroSchema::Csr {
2269                    csr_connection:
2270                        CsrConnectionAvro {
2271                            connection,
2272                            seed,
2273                            key_strategy: _,
2274                            value_strategy: _,
2275                        },
2276                } => {
2277                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2278                    let csr_connection = match item.connection()? {
2279                        Connection::Csr(_) => item.id(),
2280                        _ => {
2281                            sql_bail!(
2282                                "{} is not a schema registry connection",
2283                                scx.catalog
2284                                    .resolve_full_name(item.name())
2285                                    .to_string()
2286                                    .quoted()
2287                            )
2288                        }
2289                    };
2290
2291                    if let Some(seed) = seed {
2292                        Schema {
2293                            key_schema: seed.key_schema.clone(),
2294                            value_schema: seed.value_schema.clone(),
2295                            csr_connection: Some(csr_connection),
2296                            confluent_wire_format: true,
2297                        }
2298                    } else {
2299                        unreachable!("CSR seed resolution should already have been called: Avro")
2300                    }
2301                }
2302            };
2303
2304            if let Some(key_schema) = key_schema {
2305                return Ok(SourceDataEncoding {
2306                    key: Some(DataEncoding::Avro(AvroEncoding {
2307                        schema: key_schema,
2308                        csr_connection: csr_connection.clone(),
2309                        confluent_wire_format,
2310                    })),
2311                    value: DataEncoding::Avro(AvroEncoding {
2312                        schema: value_schema,
2313                        csr_connection,
2314                        confluent_wire_format,
2315                    }),
2316                });
2317            } else {
2318                DataEncoding::Avro(AvroEncoding {
2319                    schema: value_schema,
2320                    csr_connection,
2321                    confluent_wire_format,
2322                })
2323            }
2324        }
2325        Format::Protobuf(schema) => match schema {
2326            ProtobufSchema::Csr {
2327                csr_connection:
2328                    CsrConnectionProtobuf {
2329                        connection:
2330                            CsrConnection {
2331                                connection,
2332                                options,
2333                            },
2334                        seed,
2335                    },
2336            } => {
2337                if let Some(CsrSeedProtobuf { key, value }) = seed {
2338                    let item = scx.get_item_by_resolved_name(connection)?;
2339                    let _ = match item.connection()? {
2340                        Connection::Csr(connection) => connection,
2341                        _ => {
2342                            sql_bail!(
2343                                "{} is not a schema registry connection",
2344                                scx.catalog
2345                                    .resolve_full_name(item.name())
2346                                    .to_string()
2347                                    .quoted()
2348                            )
2349                        }
2350                    };
2351
2352                    if !options.is_empty() {
2353                        sql_bail!("Protobuf CSR connections do not support any options");
2354                    }
2355
2356                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2357                        descriptors: strconv::parse_bytes(&value.schema)?,
2358                        message_name: value.message_name.clone(),
2359                        confluent_wire_format: true,
2360                    });
2361                    if let Some(key) = key {
2362                        return Ok(SourceDataEncoding {
2363                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2364                                descriptors: strconv::parse_bytes(&key.schema)?,
2365                                message_name: key.message_name.clone(),
2366                                confluent_wire_format: true,
2367                            })),
2368                            value,
2369                        });
2370                    }
2371                    value
2372                } else {
2373                    unreachable!("CSR seed resolution should already have been called: Proto")
2374                }
2375            }
2376            ProtobufSchema::InlineSchema {
2377                message_name,
2378                schema: ast::Schema { schema },
2379            } => {
2380                let descriptors = strconv::parse_bytes(schema)?;
2381
2382                DataEncoding::Protobuf(ProtobufEncoding {
2383                    descriptors,
2384                    message_name: message_name.to_owned(),
2385                    confluent_wire_format: false,
2386                })
2387            }
2388        },
2389        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2390            regex: mz_repr::adt::regex::Regex::new(regex, false)
2391                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2392        }),
2393        Format::Csv { columns, delimiter } => {
2394            let columns = match columns {
2395                CsvColumns::Header { names } => {
2396                    if names.is_empty() {
2397                        sql_bail!("[internal error] column spec should get names in purify")
2398                    }
2399                    ColumnSpec::Header {
2400                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2401                    }
2402                }
2403                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2404            };
2405            DataEncoding::Csv(CsvEncoding {
2406                columns,
2407                delimiter: u8::try_from(*delimiter)
2408                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2409            })
2410        }
2411        Format::Json { array: false } => DataEncoding::Json,
2412        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2413        Format::Text => DataEncoding::Text,
2414    };
2415    Ok(SourceDataEncoding { key: None, value })
2416}
2417
2418/// Extract the key envelope, if it is requested
2419fn get_key_envelope(
2420    included_items: &[SourceIncludeMetadata],
2421    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2422    key_envelope_no_encoding: bool,
2423) -> Result<KeyEnvelope, PlanError> {
2424    let key_definition = included_items
2425        .iter()
2426        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2427    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2428        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2429            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2430            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2431            (Some(name), _) if key_envelope_no_encoding => {
2432                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2433            }
2434            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2435            (_, None) => {
2436                // `kd.alias` == `None` means `INCLUDE KEY`
2437                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2438                // These both make sense with the same error message
2439                sql_bail!(
2440                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2441                        got bare FORMAT"
2442                );
2443            }
2444        }
2445    } else {
2446        Ok(KeyEnvelope::None)
2447    }
2448}
2449
2450/// Gets the key envelope for a given key encoding when no name for the key has
2451/// been requested by the user.
2452fn get_unnamed_key_envelope(
2453    key: Option<&DataEncoding<ReferencedConnection>>,
2454) -> Result<KeyEnvelope, PlanError> {
2455    // If the key is requested but comes from an unnamed type then it gets the name "key"
2456    //
2457    // Otherwise it gets the names of the columns in the type
2458    let is_composite = match key {
2459        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2460        Some(
2461            DataEncoding::Avro(_)
2462            | DataEncoding::Csv(_)
2463            | DataEncoding::Protobuf(_)
2464            | DataEncoding::Regex { .. },
2465        ) => true,
2466        None => false,
2467    };
2468
2469    if is_composite {
2470        Ok(KeyEnvelope::Flattened)
2471    } else {
2472        Ok(KeyEnvelope::Named("key".to_string()))
2473    }
2474}
2475
2476pub fn describe_create_view(
2477    _: &StatementContext,
2478    _: CreateViewStatement<Aug>,
2479) -> Result<StatementDesc, PlanError> {
2480    Ok(StatementDesc::new(None))
2481}
2482
2483pub fn plan_view(
2484    scx: &StatementContext,
2485    def: &mut ViewDefinition<Aug>,
2486    params: &Params,
2487    temporary: bool,
2488) -> Result<(QualifiedItemName, View), PlanError> {
2489    let create_sql = normalize::create_statement(
2490        scx,
2491        Statement::CreateView(CreateViewStatement {
2492            if_exists: IfExistsBehavior::Error,
2493            temporary,
2494            definition: def.clone(),
2495        }),
2496    )?;
2497
2498    let ViewDefinition {
2499        name,
2500        columns,
2501        query,
2502    } = def;
2503
2504    let query::PlannedRootQuery {
2505        mut expr,
2506        mut desc,
2507        finishing,
2508        scope: _,
2509    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2510    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2511    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2512    // here to help with database-issues#236. However, in the meantime, there might be a better
2513    // approach to solve database-issues#236:
2514    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2515    assert!(finishing.is_trivial(expr.arity()));
2516
2517    expr.bind_parameters(params)?;
2518    let dependencies = expr
2519        .depends_on()
2520        .into_iter()
2521        .map(|gid| scx.catalog.resolve_item_id(&gid))
2522        .collect();
2523
2524    let name = if temporary {
2525        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2526    } else {
2527        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2528    };
2529
2530    plan_utils::maybe_rename_columns(
2531        format!("view {}", scx.catalog.resolve_full_name(&name)),
2532        &mut desc,
2533        columns,
2534    )?;
2535    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2536
2537    if let Some(dup) = names.iter().duplicates().next() {
2538        sql_bail!("column {} specified more than once", dup.as_str().quoted());
2539    }
2540
2541    let view = View {
2542        create_sql,
2543        expr,
2544        dependencies,
2545        column_names: names,
2546        temporary,
2547    };
2548
2549    Ok((name, view))
2550}
2551
2552pub fn plan_create_view(
2553    scx: &StatementContext,
2554    mut stmt: CreateViewStatement<Aug>,
2555    params: &Params,
2556) -> Result<Plan, PlanError> {
2557    let CreateViewStatement {
2558        temporary,
2559        if_exists,
2560        definition,
2561    } = &mut stmt;
2562    let (name, view) = plan_view(scx, definition, params, *temporary)?;
2563
2564    // Override the statement-level IfExistsBehavior with Skip if this is
2565    // explicitly requested in the PlanContext (the default is `false`).
2566    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2567
2568    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2569        let if_exists = true;
2570        let cascade = false;
2571        let maybe_item_to_drop = plan_drop_item(
2572            scx,
2573            ObjectType::View,
2574            if_exists,
2575            definition.name.clone(),
2576            cascade,
2577        )?;
2578
2579        // Check if the new View depends on the item that we would be replacing.
2580        if let Some(id) = maybe_item_to_drop {
2581            let dependencies = view.expr.depends_on();
2582            let invalid_drop = scx
2583                .get_item(&id)
2584                .global_ids()
2585                .any(|gid| dependencies.contains(&gid));
2586            if invalid_drop {
2587                let item = scx.catalog.get_item(&id);
2588                sql_bail!(
2589                    "cannot replace view {0}: depended upon by new {0} definition",
2590                    scx.catalog.resolve_full_name(item.name())
2591                );
2592            }
2593
2594            Some(id)
2595        } else {
2596            None
2597        }
2598    } else {
2599        None
2600    };
2601    let drop_ids = replace
2602        .map(|id| {
2603            scx.catalog
2604                .item_dependents(id)
2605                .into_iter()
2606                .map(|id| id.unwrap_item_id())
2607                .collect()
2608        })
2609        .unwrap_or_default();
2610
2611    // Check for an object in the catalog with this same name
2612    let full_name = scx.catalog.resolve_full_name(&name);
2613    let partial_name = PartialItemName::from(full_name.clone());
2614    // For PostgreSQL compatibility, we need to prevent creating views when
2615    // there is an existing object *or* type of the same name.
2616    if let (Ok(item), IfExistsBehavior::Error, false) = (
2617        scx.catalog.resolve_item_or_type(&partial_name),
2618        *if_exists,
2619        ignore_if_exists_errors,
2620    ) {
2621        return Err(PlanError::ItemAlreadyExists {
2622            name: full_name.to_string(),
2623            item_type: item.item_type(),
2624        });
2625    }
2626
2627    Ok(Plan::CreateView(CreateViewPlan {
2628        name,
2629        view,
2630        replace,
2631        drop_ids,
2632        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2633        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2634    }))
2635}
2636
2637pub fn describe_create_materialized_view(
2638    _: &StatementContext,
2639    _: CreateMaterializedViewStatement<Aug>,
2640) -> Result<StatementDesc, PlanError> {
2641    Ok(StatementDesc::new(None))
2642}
2643
2644pub fn describe_create_continual_task(
2645    _: &StatementContext,
2646    _: CreateContinualTaskStatement<Aug>,
2647) -> Result<StatementDesc, PlanError> {
2648    Ok(StatementDesc::new(None))
2649}
2650
2651pub fn describe_create_network_policy(
2652    _: &StatementContext,
2653    _: CreateNetworkPolicyStatement<Aug>,
2654) -> Result<StatementDesc, PlanError> {
2655    Ok(StatementDesc::new(None))
2656}
2657
2658pub fn describe_alter_network_policy(
2659    _: &StatementContext,
2660    _: AlterNetworkPolicyStatement<Aug>,
2661) -> Result<StatementDesc, PlanError> {
2662    Ok(StatementDesc::new(None))
2663}
2664
2665pub fn plan_create_materialized_view(
2666    scx: &StatementContext,
2667    mut stmt: CreateMaterializedViewStatement<Aug>,
2668    params: &Params,
2669) -> Result<Plan, PlanError> {
2670    let cluster_id =
2671        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2672    stmt.in_cluster = Some(ResolvedClusterName {
2673        id: cluster_id,
2674        print_name: None,
2675    });
2676
2677    let create_sql =
2678        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2679
2680    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2681    let name = scx.allocate_qualified_name(partial_name.clone())?;
2682
2683    let query::PlannedRootQuery {
2684        mut expr,
2685        mut desc,
2686        finishing,
2687        scope: _,
2688    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2689    // We get back a trivial finishing, see comment in `plan_view`.
2690    assert!(finishing.is_trivial(expr.arity()));
2691
2692    expr.bind_parameters(params)?;
2693
2694    plan_utils::maybe_rename_columns(
2695        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2696        &mut desc,
2697        &stmt.columns,
2698    )?;
2699    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2700
2701    let MaterializedViewOptionExtracted {
2702        assert_not_null,
2703        partition_by,
2704        retain_history,
2705        refresh,
2706        seen: _,
2707    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2708
2709    if let Some(partition_by) = partition_by {
2710        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2711        check_partition_by(&desc, partition_by)?;
2712    }
2713
2714    let refresh_schedule = {
2715        let mut refresh_schedule = RefreshSchedule::default();
2716        let mut on_commits_seen = 0;
2717        for refresh_option_value in refresh {
2718            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2719                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2720            }
2721            match refresh_option_value {
2722                RefreshOptionValue::OnCommit => {
2723                    on_commits_seen += 1;
2724                }
2725                RefreshOptionValue::AtCreation => {
2726                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2727                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2728                }
2729                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2730                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2731                    let ecx = &ExprContext {
2732                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2733                        name: "REFRESH AT",
2734                        scope: &Scope::empty(),
2735                        relation_type: &RelationType::empty(),
2736                        allow_aggregates: false,
2737                        allow_subqueries: false,
2738                        allow_parameters: false,
2739                        allow_windows: false,
2740                    };
2741                    let hir = plan_expr(ecx, &time)?.cast_to(
2742                        ecx,
2743                        CastContext::Assignment,
2744                        &ScalarType::MzTimestamp,
2745                    )?;
2746                    // (mz_now was purified away to a literal earlier)
2747                    let timestamp = hir
2748                        .into_literal_mz_timestamp()
2749                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2750                    refresh_schedule.ats.push(timestamp);
2751                }
2752                RefreshOptionValue::Every(RefreshEveryOptionValue {
2753                    interval,
2754                    aligned_to,
2755                }) => {
2756                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2757                    if interval.as_microseconds() <= 0 {
2758                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2759                    }
2760                    if interval.months != 0 {
2761                        // This limitation is because we want Intervals to be cleanly convertable
2762                        // to a unix epoch timestamp difference. When the interval involves months, then
2763                        // this is not true anymore, because months have variable lengths.
2764                        // See `Timestamp::round_up`.
2765                        sql_bail!("REFRESH interval must not involve units larger than days");
2766                    }
2767                    let interval = interval.duration()?;
2768                    if u64::try_from(interval.as_millis()).is_err() {
2769                        sql_bail!("REFRESH interval too large");
2770                    }
2771
2772                    let mut aligned_to = match aligned_to {
2773                        Some(aligned_to) => aligned_to,
2774                        None => {
2775                            soft_panic_or_log!(
2776                                "ALIGNED TO should have been filled in by purification"
2777                            );
2778                            sql_bail!(
2779                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2780                            )
2781                        }
2782                    };
2783
2784                    // Desugar the `aligned_to` expression
2785                    transform_ast::transform(scx, &mut aligned_to)?;
2786
2787                    let ecx = &ExprContext {
2788                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2789                        name: "REFRESH EVERY ... ALIGNED TO",
2790                        scope: &Scope::empty(),
2791                        relation_type: &RelationType::empty(),
2792                        allow_aggregates: false,
2793                        allow_subqueries: false,
2794                        allow_parameters: false,
2795                        allow_windows: false,
2796                    };
2797                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2798                        ecx,
2799                        CastContext::Assignment,
2800                        &ScalarType::MzTimestamp,
2801                    )?;
2802                    // (mz_now was purified away to a literal earlier)
2803                    let aligned_to_const = aligned_to_hir
2804                        .into_literal_mz_timestamp()
2805                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2806
2807                    refresh_schedule.everies.push(RefreshEvery {
2808                        interval,
2809                        aligned_to: aligned_to_const,
2810                    });
2811                }
2812            }
2813        }
2814
2815        if on_commits_seen > 1 {
2816            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2817        }
2818        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2819            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2820        }
2821
2822        if refresh_schedule == RefreshSchedule::default() {
2823            None
2824        } else {
2825            Some(refresh_schedule)
2826        }
2827    };
2828
2829    let as_of = stmt.as_of.map(Timestamp::from);
2830    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2831    let mut non_null_assertions = assert_not_null
2832        .into_iter()
2833        .map(normalize::column_name)
2834        .map(|assertion_name| {
2835            column_names
2836                .iter()
2837                .position(|col| col == &assertion_name)
2838                .ok_or_else(|| {
2839                    sql_err!(
2840                        "column {} in ASSERT NOT NULL option not found",
2841                        assertion_name.as_str().quoted()
2842                    )
2843                })
2844        })
2845        .collect::<Result<Vec<_>, _>>()?;
2846    non_null_assertions.sort();
2847    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2848        let dup = &column_names[*dup];
2849        sql_bail!(
2850            "duplicate column {} in non-null assertions",
2851            dup.as_str().quoted()
2852        );
2853    }
2854
2855    if let Some(dup) = column_names.iter().duplicates().next() {
2856        sql_bail!("column {} specified more than once", dup.as_str().quoted());
2857    }
2858
2859    // Override the statement-level IfExistsBehavior with Skip if this is
2860    // explicitly requested in the PlanContext (the default is `false`).
2861    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2862        Ok(true) => IfExistsBehavior::Skip,
2863        _ => stmt.if_exists,
2864    };
2865
2866    let mut replace = None;
2867    let mut if_not_exists = false;
2868    match if_exists {
2869        IfExistsBehavior::Replace => {
2870            let if_exists = true;
2871            let cascade = false;
2872            let replace_id = plan_drop_item(
2873                scx,
2874                ObjectType::MaterializedView,
2875                if_exists,
2876                partial_name.into(),
2877                cascade,
2878            )?;
2879
2880            // Check if the new Materialized View depends on the item that we would be replacing.
2881            if let Some(id) = replace_id {
2882                let dependencies = expr.depends_on();
2883                let invalid_drop = scx
2884                    .get_item(&id)
2885                    .global_ids()
2886                    .any(|gid| dependencies.contains(&gid));
2887                if invalid_drop {
2888                    let item = scx.catalog.get_item(&id);
2889                    sql_bail!(
2890                        "cannot replace materialized view {0}: depended upon by new {0} definition",
2891                        scx.catalog.resolve_full_name(item.name())
2892                    );
2893                }
2894                replace = Some(id);
2895            }
2896        }
2897        IfExistsBehavior::Skip => if_not_exists = true,
2898        IfExistsBehavior::Error => (),
2899    }
2900    let drop_ids = replace
2901        .map(|id| {
2902            scx.catalog
2903                .item_dependents(id)
2904                .into_iter()
2905                .map(|id| id.unwrap_item_id())
2906                .collect()
2907        })
2908        .unwrap_or_default();
2909    let dependencies = expr
2910        .depends_on()
2911        .into_iter()
2912        .map(|gid| scx.catalog.resolve_item_id(&gid))
2913        .collect();
2914
2915    // Check for an object in the catalog with this same name
2916    let full_name = scx.catalog.resolve_full_name(&name);
2917    let partial_name = PartialItemName::from(full_name.clone());
2918    // For PostgreSQL compatibility, we need to prevent creating materialized
2919    // views when there is an existing object *or* type of the same name.
2920    if let (IfExistsBehavior::Error, Ok(item)) =
2921        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2922    {
2923        return Err(PlanError::ItemAlreadyExists {
2924            name: full_name.to_string(),
2925            item_type: item.item_type(),
2926        });
2927    }
2928
2929    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2930        name,
2931        materialized_view: MaterializedView {
2932            create_sql,
2933            expr,
2934            dependencies,
2935            column_names,
2936            cluster_id,
2937            non_null_assertions,
2938            compaction_window,
2939            refresh_schedule,
2940            as_of,
2941        },
2942        replace,
2943        drop_ids,
2944        if_not_exists,
2945        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2946    }))
2947}
2948
2949generate_extracted_config!(
2950    MaterializedViewOption,
2951    (AssertNotNull, Ident, AllowMultiple),
2952    (PartitionBy, Vec<Ident>),
2953    (RetainHistory, OptionalDuration),
2954    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2955);
2956
2957pub fn plan_create_continual_task(
2958    scx: &StatementContext,
2959    mut stmt: CreateContinualTaskStatement<Aug>,
2960    params: &Params,
2961) -> Result<Plan, PlanError> {
2962    match &stmt.sugar {
2963        None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
2964        Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
2965            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
2966        }
2967        Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
2968            scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
2969        }
2970    };
2971    let cluster_id = match &stmt.in_cluster {
2972        None => scx.catalog.resolve_cluster(None)?.id(),
2973        Some(in_cluster) => in_cluster.id,
2974    };
2975    stmt.in_cluster = Some(ResolvedClusterName {
2976        id: cluster_id,
2977        print_name: None,
2978    });
2979
2980    let create_sql =
2981        normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
2982
2983    let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
2984
2985    // It seems desirable for a CT that e.g. simply filters the input to keep
2986    // the same nullability. So, start by assuming all columns are non-nullable,
2987    // and then make them nullable below if any of the exprs plan them as
2988    // nullable.
2989    let mut desc = match stmt.columns {
2990        None => None,
2991        Some(columns) => {
2992            let mut desc_columns = Vec::with_capacity(columns.capacity());
2993            for col in columns.iter() {
2994                desc_columns.push((
2995                    normalize::column_name(col.name.clone()),
2996                    ColumnType {
2997                        scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
2998                        nullable: false,
2999                    },
3000                ));
3001            }
3002            Some(RelationDesc::from_names_and_types(desc_columns))
3003        }
3004    };
3005    let input = scx.get_item_by_resolved_name(&stmt.input)?;
3006    match input.item_type() {
3007        // Input must be a thing directly backed by a persist shard, so we can
3008        // use a persist listen to efficiently rehydrate.
3009        CatalogItemType::ContinualTask
3010        | CatalogItemType::Table
3011        | CatalogItemType::MaterializedView
3012        | CatalogItemType::Source => {}
3013        CatalogItemType::Sink
3014        | CatalogItemType::View
3015        | CatalogItemType::Index
3016        | CatalogItemType::Type
3017        | CatalogItemType::Func
3018        | CatalogItemType::Secret
3019        | CatalogItemType::Connection => {
3020            sql_bail!(
3021                "CONTINUAL TASK cannot use {} as an input",
3022                input.item_type()
3023            );
3024        }
3025    }
3026
3027    let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3028    let ct_name = stmt.name;
3029    let placeholder_id = match &ct_name {
3030        ResolvedItemName::ContinualTask { id, name } => {
3031            let desc = match desc.as_ref().cloned() {
3032                Some(x) => x,
3033                None => {
3034                    // The user didn't specify the CT's columns. Take a wild
3035                    // guess that the CT has the same shape as the input. It's
3036                    // fine if this is wrong, we'll get an error below after
3037                    // planning the query.
3038                    let input_name = scx.catalog.resolve_full_name(input.name());
3039                    input.desc(&input_name)?.into_owned()
3040                }
3041            };
3042            qcx.ctes.insert(
3043                *id,
3044                CteDesc {
3045                    name: name.item.clone(),
3046                    desc,
3047                },
3048            );
3049            Some(*id)
3050        }
3051        _ => None,
3052    };
3053
3054    let mut exprs = Vec::new();
3055    for (idx, stmt) in stmt.stmts.iter().enumerate() {
3056        let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3057        let query::PlannedRootQuery {
3058            mut expr,
3059            desc: desc_query,
3060            finishing,
3061            scope: _,
3062        } = query::plan_ct_query(&mut qcx, query)?;
3063        // We get back a trivial finishing because we plan with a "maintained"
3064        // QueryLifetime, see comment in `plan_view`.
3065        assert!(finishing.is_trivial(expr.arity()));
3066        expr.bind_parameters(params)?;
3067        let expr = match desc.as_mut() {
3068            None => {
3069                desc = Some(desc_query);
3070                expr
3071            }
3072            Some(desc) => {
3073                // We specify the columns for DELETE, so if any columns types don't
3074                // match, it's because it's an INSERT.
3075                if desc_query.arity() > desc.arity() {
3076                    sql_bail!(
3077                        "statement {}: INSERT has more expressions than target columns",
3078                        idx
3079                    );
3080                }
3081                if desc_query.arity() < desc.arity() {
3082                    sql_bail!(
3083                        "statement {}: INSERT has more target columns than expressions",
3084                        idx
3085                    );
3086                }
3087                // Ensure the types of the source query match the types of the target table,
3088                // installing assignment casts where necessary and possible.
3089                let target_types = desc.iter_types().map(|x| &x.scalar_type);
3090                let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3091                let expr = expr.map_err(|e| {
3092                    sql_err!(
3093                        "statement {}: column {} is of type {} but expression is of type {}",
3094                        idx,
3095                        desc.get_name(e.column).as_str().quoted(),
3096                        qcx.humanize_scalar_type(&e.target_type, false),
3097                        qcx.humanize_scalar_type(&e.source_type, false),
3098                    )
3099                })?;
3100
3101                // Update ct nullability as necessary. The `ne` above verified that the
3102                // types are the same len.
3103                let zip_types = || desc.iter_types().zip(desc_query.iter_types());
3104                let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3105                if updated {
3106                    let new_types = zip_types().map(|(ct, q)| {
3107                        let mut ct = ct.clone();
3108                        if q.nullable {
3109                            ct.nullable = true;
3110                        }
3111                        ct
3112                    });
3113                    *desc = RelationDesc::from_names_and_types(
3114                        desc.iter_names().cloned().zip(new_types),
3115                    );
3116                }
3117
3118                expr
3119            }
3120        };
3121        match stmt {
3122            ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3123            ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3124        }
3125    }
3126    // TODO(ct3): Collect things by output and assert that there is only one (or
3127    // support multiple outputs).
3128    let expr = exprs
3129        .into_iter()
3130        .reduce(|acc, expr| acc.union(expr))
3131        .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3132    let dependencies = expr
3133        .depends_on()
3134        .into_iter()
3135        .map(|gid| scx.catalog.resolve_item_id(&gid))
3136        .collect();
3137
3138    let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3139    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3140    if let Some(dup) = column_names.iter().duplicates().next() {
3141        sql_bail!("column {} specified more than once", dup.as_str().quoted());
3142    }
3143
3144    // Check for an object in the catalog with this same name
3145    let name = match &ct_name {
3146        ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3147        ResolvedItemName::ContinualTask { name, .. } => {
3148            let name = scx.allocate_qualified_name(name.clone())?;
3149            let full_name = scx.catalog.resolve_full_name(&name);
3150            let partial_name = PartialItemName::from(full_name.clone());
3151            // For PostgreSQL compatibility, we need to prevent creating this when there
3152            // is an existing object *or* type of the same name.
3153            if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3154                return Err(PlanError::ItemAlreadyExists {
3155                    name: full_name.to_string(),
3156                    item_type: item.item_type(),
3157                });
3158            }
3159            name
3160        }
3161        ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3162        ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3163    };
3164
3165    let as_of = stmt.as_of.map(Timestamp::from);
3166    Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3167        name,
3168        placeholder_id,
3169        desc,
3170        input_id: input.global_id(),
3171        with_snapshot: snapshot.unwrap_or(true),
3172        continual_task: MaterializedView {
3173            create_sql,
3174            expr,
3175            dependencies,
3176            column_names,
3177            cluster_id,
3178            non_null_assertions: Vec::new(),
3179            compaction_window: None,
3180            refresh_schedule: None,
3181            as_of,
3182        },
3183    }))
3184}
3185
3186fn continual_task_query<'a>(
3187    ct_name: &ResolvedItemName,
3188    stmt: &'a ast::ContinualTaskStmt<Aug>,
3189) -> Option<ast::Query<Aug>> {
3190    match stmt {
3191        ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3192            table_name: _,
3193            columns,
3194            source,
3195            returning,
3196        }) => {
3197            if !columns.is_empty() || !returning.is_empty() {
3198                return None;
3199            }
3200            match source {
3201                ast::InsertSource::Query(query) => Some(query.clone()),
3202                ast::InsertSource::DefaultValues => None,
3203            }
3204        }
3205        ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3206            table_name: _,
3207            alias,
3208            using,
3209            selection,
3210        }) => {
3211            if !using.is_empty() {
3212                return None;
3213            }
3214            // Construct a `SELECT *` with the `DELETE` selection as a `WHERE`.
3215            let from = ast::TableWithJoins {
3216                relation: ast::TableFactor::Table {
3217                    name: ct_name.clone(),
3218                    alias: alias.clone(),
3219                },
3220                joins: Vec::new(),
3221            };
3222            let select = ast::Select {
3223                from: vec![from],
3224                selection: selection.clone(),
3225                distinct: None,
3226                projection: vec![ast::SelectItem::Wildcard],
3227                group_by: Vec::new(),
3228                having: None,
3229                qualify: None,
3230                options: Vec::new(),
3231            };
3232            let query = ast::Query {
3233                ctes: ast::CteBlock::Simple(Vec::new()),
3234                body: ast::SetExpr::Select(Box::new(select)),
3235                order_by: Vec::new(),
3236                limit: None,
3237                offset: None,
3238            };
3239            // Then negate it to turn it into retractions (after planning it).
3240            Some(query)
3241        }
3242    }
3243}
3244
3245generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3246
3247pub fn describe_create_sink(
3248    _: &StatementContext,
3249    _: CreateSinkStatement<Aug>,
3250) -> Result<StatementDesc, PlanError> {
3251    Ok(StatementDesc::new(None))
3252}
3253
3254generate_extracted_config!(
3255    CreateSinkOption,
3256    (Snapshot, bool),
3257    (PartitionStrategy, String),
3258    (Version, u64)
3259);
3260
3261pub fn plan_create_sink(
3262    scx: &StatementContext,
3263    stmt: CreateSinkStatement<Aug>,
3264) -> Result<Plan, PlanError> {
3265    // Check for an object in the catalog with this same name
3266    let Some(name) = stmt.name.clone() else {
3267        return Err(PlanError::MissingName(CatalogItemType::Sink));
3268    };
3269    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3270    let full_name = scx.catalog.resolve_full_name(&name);
3271    let partial_name = PartialItemName::from(full_name.clone());
3272    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3273        return Err(PlanError::ItemAlreadyExists {
3274            name: full_name.to_string(),
3275            item_type: item.item_type(),
3276        });
3277    }
3278
3279    plan_sink(scx, stmt)
3280}
3281
3282/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3283/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3284/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3285/// important.
3286fn plan_sink(
3287    scx: &StatementContext,
3288    mut stmt: CreateSinkStatement<Aug>,
3289) -> Result<Plan, PlanError> {
3290    let CreateSinkStatement {
3291        name,
3292        in_cluster: _,
3293        from,
3294        connection,
3295        format,
3296        envelope,
3297        if_not_exists,
3298        with_options,
3299    } = stmt.clone();
3300
3301    let Some(name) = name else {
3302        return Err(PlanError::MissingName(CatalogItemType::Sink));
3303    };
3304    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3305
3306    let envelope = match envelope {
3307        Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3308        Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3309        None => sql_bail!("ENVELOPE clause is required"),
3310    };
3311
3312    let from_name = &from;
3313    let from = scx.get_item_by_resolved_name(&from)?;
3314    if from.id().is_system() {
3315        bail_unsupported!("creating a sink directly on a catalog object");
3316    }
3317    let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3318    let key_indices = match &connection {
3319        CreateSinkConnection::Kafka { key, .. } => {
3320            if let Some(key) = key.clone() {
3321                let key_columns = key
3322                    .key_columns
3323                    .into_iter()
3324                    .map(normalize::column_name)
3325                    .collect::<Vec<_>>();
3326                let mut uniq = BTreeSet::new();
3327                for col in key_columns.iter() {
3328                    if !uniq.insert(col) {
3329                        sql_bail!("duplicate column referenced in KEY: {}", col);
3330                    }
3331                }
3332                let indices = key_columns
3333                    .iter()
3334                    .map(|col| -> anyhow::Result<usize> {
3335                        let name_idx =
3336                            desc.get_by_name(col)
3337                                .map(|(idx, _type)| idx)
3338                                .ok_or_else(|| {
3339                                    sql_err!("column referenced in KEY does not exist: {}", col)
3340                                })?;
3341                        if desc.get_unambiguous_name(name_idx).is_none() {
3342                            sql_err!("column referenced in KEY is ambiguous: {}", col);
3343                        }
3344                        Ok(name_idx)
3345                    })
3346                    .collect::<Result<Vec<_>, _>>()?;
3347                let is_valid_key =
3348                    desc.typ().keys.iter().any(|key_columns| {
3349                        key_columns.iter().all(|column| indices.contains(column))
3350                    });
3351
3352                if !is_valid_key && envelope == SinkEnvelope::Upsert {
3353                    if key.not_enforced {
3354                        scx.catalog
3355                            .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3356                                key: key_columns.clone(),
3357                                name: name.item.clone(),
3358                            })
3359                    } else {
3360                        return Err(PlanError::UpsertSinkWithInvalidKey {
3361                            name: from_name.full_name_str(),
3362                            desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3363                            valid_keys: desc
3364                                .typ()
3365                                .keys
3366                                .iter()
3367                                .map(|key| {
3368                                    key.iter()
3369                                        .map(|col| desc.get_name(*col).as_str().into())
3370                                        .collect()
3371                                })
3372                                .collect(),
3373                        });
3374                    }
3375                }
3376                Some(indices)
3377            } else {
3378                None
3379            }
3380        }
3381    };
3382
3383    let headers_index = match &connection {
3384        CreateSinkConnection::Kafka {
3385            headers: Some(headers),
3386            ..
3387        } => {
3388            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3389
3390            match envelope {
3391                SinkEnvelope::Upsert => (),
3392                SinkEnvelope::Debezium => {
3393                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3394                }
3395            };
3396
3397            let headers = normalize::column_name(headers.clone());
3398            let (idx, ty) = desc
3399                .get_by_name(&headers)
3400                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3401
3402            if desc.get_unambiguous_name(idx).is_none() {
3403                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3404            }
3405
3406            match &ty.scalar_type {
3407                ScalarType::Map { value_type, .. }
3408                    if matches!(&**value_type, ScalarType::String | ScalarType::Bytes) => {}
3409                _ => sql_bail!(
3410                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3411                ),
3412            }
3413
3414            Some(idx)
3415        }
3416        _ => None,
3417    };
3418
3419    // pick the first valid natural relation key, if any
3420    let relation_key_indices = desc.typ().keys.get(0).cloned();
3421
3422    let key_desc_and_indices = key_indices.map(|key_indices| {
3423        let cols = desc
3424            .iter()
3425            .map(|(name, ty)| (name.clone(), ty.clone()))
3426            .collect::<Vec<_>>();
3427        let (names, types): (Vec<_>, Vec<_>) =
3428            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3429        let typ = RelationType::new(types);
3430        (RelationDesc::new(typ, names), key_indices)
3431    });
3432
3433    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3434        return Err(PlanError::UpsertSinkWithoutKey);
3435    }
3436
3437    let connection_builder = match connection {
3438        CreateSinkConnection::Kafka {
3439            connection,
3440            options,
3441            ..
3442        } => kafka_sink_builder(
3443            scx,
3444            connection,
3445            options,
3446            format,
3447            relation_key_indices,
3448            key_desc_and_indices,
3449            headers_index,
3450            desc.into_owned(),
3451            envelope,
3452            from.id(),
3453        )?,
3454    };
3455
3456    let CreateSinkOptionExtracted {
3457        snapshot,
3458        version,
3459        partition_strategy: _,
3460        seen: _,
3461    } = with_options.try_into()?;
3462
3463    // WITH SNAPSHOT defaults to true
3464    let with_snapshot = snapshot.unwrap_or(true);
3465    // VERSION defaults to 0
3466    let version = version.unwrap_or(0);
3467
3468    // We will rewrite the cluster if one is not provided, so we must use the
3469    // `in_cluster` value we plan to normalize when we canonicalize the create
3470    // statement.
3471    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3472    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3473
3474    Ok(Plan::CreateSink(CreateSinkPlan {
3475        name,
3476        sink: Sink {
3477            create_sql,
3478            from: from.global_id(),
3479            connection: connection_builder,
3480            envelope,
3481            version,
3482        },
3483        with_snapshot,
3484        if_not_exists,
3485        in_cluster: in_cluster.id(),
3486    }))
3487}
3488
3489fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3490    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3491
3492    let existing_keys = desc
3493        .typ()
3494        .keys
3495        .iter()
3496        .map(|key_columns| {
3497            key_columns
3498                .iter()
3499                .map(|col| desc.get_name(*col).as_str())
3500                .join(", ")
3501        })
3502        .join(", ");
3503
3504    sql_err!(
3505        "Key constraint ({}) conflicts with existing key ({})",
3506        user_keys,
3507        existing_keys
3508    )
3509}
3510
3511/// Creating this by hand instead of using generate_extracted_config! macro
3512/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3513#[derive(Debug, Default, PartialEq, Clone)]
3514pub struct CsrConfigOptionExtracted {
3515    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3516    pub(crate) avro_key_fullname: Option<String>,
3517    pub(crate) avro_value_fullname: Option<String>,
3518    pub(crate) null_defaults: bool,
3519    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3520    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3521    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3522    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3523}
3524
3525impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3526    type Error = crate::plan::PlanError;
3527    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3528        let mut extracted = CsrConfigOptionExtracted::default();
3529        let mut common_doc_comments = BTreeMap::new();
3530        for option in v {
3531            if !extracted.seen.insert(option.name.clone()) {
3532                return Err(PlanError::Unstructured({
3533                    format!("{} specified more than once", option.name)
3534                }));
3535            }
3536            let option_name = option.name.clone();
3537            let option_name_str = option_name.to_ast_string_simple();
3538            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3539                option_name: option_name.to_ast_string_simple(),
3540                err: e.into(),
3541            };
3542            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3543                val.map(|s| match s {
3544                    WithOptionValue::Value(Value::String(s)) => {
3545                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3546                    }
3547                    _ => Err("must be a string".to_string()),
3548                })
3549                .transpose()
3550                .map_err(PlanError::Unstructured)
3551                .map_err(better_error)
3552            };
3553            match option.name {
3554                CsrConfigOptionName::AvroKeyFullname => {
3555                    extracted.avro_key_fullname =
3556                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3557                }
3558                CsrConfigOptionName::AvroValueFullname => {
3559                    extracted.avro_value_fullname =
3560                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3561                }
3562                CsrConfigOptionName::NullDefaults => {
3563                    extracted.null_defaults =
3564                        <bool>::try_from_value(option.value).map_err(better_error)?;
3565                }
3566                CsrConfigOptionName::AvroDocOn(doc_on) => {
3567                    let value = String::try_from_value(option.value.ok_or_else(|| {
3568                        PlanError::InvalidOptionValue {
3569                            option_name: option_name_str,
3570                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3571                        }
3572                    })?)
3573                    .map_err(better_error)?;
3574                    let key = match doc_on.identifier {
3575                        DocOnIdentifier::Column(ast::ColumnName {
3576                            relation: ResolvedItemName::Item { id, .. },
3577                            column: ResolvedColumnReference::Column { name, index: _ },
3578                        }) => DocTarget::Field {
3579                            object_id: id,
3580                            column_name: name,
3581                        },
3582                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3583                            DocTarget::Type(id)
3584                        }
3585                        _ => unreachable!(),
3586                    };
3587
3588                    match doc_on.for_schema {
3589                        DocOnSchema::KeyOnly => {
3590                            extracted.key_doc_options.insert(key, value);
3591                        }
3592                        DocOnSchema::ValueOnly => {
3593                            extracted.value_doc_options.insert(key, value);
3594                        }
3595                        DocOnSchema::All => {
3596                            common_doc_comments.insert(key, value);
3597                        }
3598                    }
3599                }
3600                CsrConfigOptionName::KeyCompatibilityLevel => {
3601                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3602                }
3603                CsrConfigOptionName::ValueCompatibilityLevel => {
3604                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3605                }
3606            }
3607        }
3608
3609        for (key, value) in common_doc_comments {
3610            if !extracted.key_doc_options.contains_key(&key) {
3611                extracted.key_doc_options.insert(key.clone(), value.clone());
3612            }
3613            if !extracted.value_doc_options.contains_key(&key) {
3614                extracted.value_doc_options.insert(key, value);
3615            }
3616        }
3617        Ok(extracted)
3618    }
3619}
3620
3621fn kafka_sink_builder(
3622    scx: &StatementContext,
3623    connection: ResolvedItemName,
3624    options: Vec<KafkaSinkConfigOption<Aug>>,
3625    format: Option<FormatSpecifier<Aug>>,
3626    relation_key_indices: Option<Vec<usize>>,
3627    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3628    headers_index: Option<usize>,
3629    value_desc: RelationDesc,
3630    envelope: SinkEnvelope,
3631    sink_from: CatalogItemId,
3632) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3633    // Get Kafka connection.
3634    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3635    let connection_id = connection_item.id();
3636    match connection_item.connection()? {
3637        Connection::Kafka(_) => (),
3638        _ => sql_bail!(
3639            "{} is not a kafka connection",
3640            scx.catalog.resolve_full_name(connection_item.name())
3641        ),
3642    };
3643
3644    let KafkaSinkConfigOptionExtracted {
3645        topic,
3646        compression_type,
3647        partition_by,
3648        progress_group_id_prefix,
3649        transactional_id_prefix,
3650        legacy_ids,
3651        topic_config,
3652        topic_metadata_refresh_interval,
3653        topic_partition_count,
3654        topic_replication_factor,
3655        seen: _,
3656    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3657
3658    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3659        (Some(_), Some(true)) => {
3660            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3661        }
3662        (None, Some(true)) => KafkaIdStyle::Legacy,
3663        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3664    };
3665
3666    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3667        (Some(_), Some(true)) => {
3668            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3669        }
3670        (None, Some(true)) => KafkaIdStyle::Legacy,
3671        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3672    };
3673
3674    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3675
3676    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3677        // This is a librdkafka-enforced restriction that, if violated,
3678        // would result in a runtime error for the source.
3679        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3680    }
3681
3682    let assert_positive = |val: Option<i32>, name: &str| {
3683        if let Some(val) = val {
3684            if val <= 0 {
3685                sql_bail!("{} must be a positive integer", name);
3686            }
3687        }
3688        val.map(NonNeg::try_from)
3689            .transpose()
3690            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3691    };
3692    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3693    let topic_replication_factor =
3694        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3695
3696    // Helper method to parse avro connection options for format specifiers that use avro
3697    // for either key or value encoding.
3698    let gen_avro_schema_options = |conn| {
3699        let CsrConnectionAvro {
3700            connection:
3701                CsrConnection {
3702                    connection,
3703                    options,
3704                },
3705            seed,
3706            key_strategy,
3707            value_strategy,
3708        } = conn;
3709        if seed.is_some() {
3710            sql_bail!("SEED option does not make sense with sinks");
3711        }
3712        if key_strategy.is_some() {
3713            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3714        }
3715        if value_strategy.is_some() {
3716            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3717        }
3718
3719        let item = scx.get_item_by_resolved_name(&connection)?;
3720        let csr_connection = match item.connection()? {
3721            Connection::Csr(_) => item.id(),
3722            _ => {
3723                sql_bail!(
3724                    "{} is not a schema registry connection",
3725                    scx.catalog
3726                        .resolve_full_name(item.name())
3727                        .to_string()
3728                        .quoted()
3729                )
3730            }
3731        };
3732        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3733
3734        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3735            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3736        }
3737
3738        if key_desc_and_indices.is_some()
3739            && (extracted_options.avro_key_fullname.is_some()
3740                ^ extracted_options.avro_value_fullname.is_some())
3741        {
3742            sql_bail!(
3743                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3744            );
3745        }
3746
3747        Ok((csr_connection, extracted_options))
3748    };
3749
3750    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3751        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3752        Format::Bytes if desc.arity() == 1 => {
3753            let col_type = &desc.typ().column_types[0].scalar_type;
3754            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3755                bail_unsupported!(format!(
3756                    "BYTES format with non-encodable type: {:?}",
3757                    col_type
3758                ));
3759            }
3760
3761            Ok(KafkaSinkFormatType::Bytes)
3762        }
3763        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3764        Format::Bytes | Format::Text => {
3765            bail_unsupported!("BYTES or TEXT format with multiple columns")
3766        }
3767        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3768        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3769            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3770            let schema = if is_key {
3771                AvroSchemaGenerator::new(
3772                    desc.clone(),
3773                    false,
3774                    options.key_doc_options,
3775                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3776                    options.null_defaults,
3777                    Some(sink_from),
3778                    false,
3779                )?
3780                .schema()
3781                .to_string()
3782            } else {
3783                AvroSchemaGenerator::new(
3784                    desc.clone(),
3785                    matches!(envelope, SinkEnvelope::Debezium),
3786                    options.value_doc_options,
3787                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3788                    options.null_defaults,
3789                    Some(sink_from),
3790                    true,
3791                )?
3792                .schema()
3793                .to_string()
3794            };
3795            Ok(KafkaSinkFormatType::Avro {
3796                schema,
3797                compatibility_level: if is_key {
3798                    options.key_compatibility_level
3799                } else {
3800                    options.value_compatibility_level
3801                },
3802                csr_connection,
3803            })
3804        }
3805        format => bail_unsupported!(format!("sink format {:?}", format)),
3806    };
3807
3808    let partition_by = match &partition_by {
3809        Some(partition_by) => {
3810            let mut scope = Scope::from_source(None, value_desc.iter_names());
3811
3812            match envelope {
3813                SinkEnvelope::Upsert => (),
3814                SinkEnvelope::Debezium => {
3815                    let key_indices: HashSet<_> = key_desc_and_indices
3816                        .as_ref()
3817                        .map(|(_desc, indices)| indices.as_slice())
3818                        .unwrap_or_default()
3819                        .into_iter()
3820                        .collect();
3821                    for (i, item) in scope.items.iter_mut().enumerate() {
3822                        if !key_indices.contains(&i) {
3823                            item.error_if_referenced = Some(|_table, column| {
3824                                PlanError::InvalidPartitionByEnvelopeDebezium {
3825                                    column_name: column.to_string(),
3826                                }
3827                            });
3828                        }
3829                    }
3830                }
3831            };
3832
3833            let ecx = &ExprContext {
3834                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3835                name: "PARTITION BY",
3836                scope: &scope,
3837                relation_type: value_desc.typ(),
3838                allow_aggregates: false,
3839                allow_subqueries: false,
3840                allow_parameters: false,
3841                allow_windows: false,
3842            };
3843            let expr = plan_expr(ecx, partition_by)?.cast_to(
3844                ecx,
3845                CastContext::Assignment,
3846                &ScalarType::UInt64,
3847            )?;
3848            let expr = expr.lower_uncorrelated()?;
3849
3850            Some(expr)
3851        }
3852        _ => None,
3853    };
3854
3855    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3856    let format = match format {
3857        Some(FormatSpecifier::KeyValue { key, value }) => {
3858            let key_format = match key_desc_and_indices.as_ref() {
3859                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3860                None => None,
3861            };
3862            KafkaSinkFormat {
3863                value_format: map_format(value, &value_desc, false)?,
3864                key_format,
3865            }
3866        }
3867        Some(FormatSpecifier::Bare(format)) => {
3868            let key_format = match key_desc_and_indices.as_ref() {
3869                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3870                None => None,
3871            };
3872            KafkaSinkFormat {
3873                value_format: map_format(format, &value_desc, false)?,
3874                key_format,
3875            }
3876        }
3877        None => bail_unsupported!("sink without format"),
3878    };
3879
3880    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3881        connection_id,
3882        connection: connection_id,
3883        format,
3884        topic: topic_name,
3885        relation_key_indices,
3886        key_desc_and_indices,
3887        headers_index,
3888        value_desc,
3889        partition_by,
3890        compression_type,
3891        progress_group_id,
3892        transactional_id,
3893        topic_options: KafkaTopicOptions {
3894            partition_count: topic_partition_count,
3895            replication_factor: topic_replication_factor,
3896            topic_config: topic_config.unwrap_or_default(),
3897        },
3898        topic_metadata_refresh_interval,
3899    }))
3900}
3901
3902pub fn describe_create_index(
3903    _: &StatementContext,
3904    _: CreateIndexStatement<Aug>,
3905) -> Result<StatementDesc, PlanError> {
3906    Ok(StatementDesc::new(None))
3907}
3908
3909pub fn plan_create_index(
3910    scx: &StatementContext,
3911    mut stmt: CreateIndexStatement<Aug>,
3912) -> Result<Plan, PlanError> {
3913    let CreateIndexStatement {
3914        name,
3915        on_name,
3916        in_cluster,
3917        key_parts,
3918        with_options,
3919        if_not_exists,
3920    } = &mut stmt;
3921    let on = scx.get_item_by_resolved_name(on_name)?;
3922    let on_item_type = on.item_type();
3923
3924    if !matches!(
3925        on_item_type,
3926        CatalogItemType::View
3927            | CatalogItemType::MaterializedView
3928            | CatalogItemType::Source
3929            | CatalogItemType::Table
3930    ) {
3931        sql_bail!(
3932            "index cannot be created on {} because it is a {}",
3933            on_name.full_name_str(),
3934            on.item_type()
3935        )
3936    }
3937
3938    let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3939
3940    let filled_key_parts = match key_parts {
3941        Some(kp) => kp.to_vec(),
3942        None => {
3943            // `key_parts` is None if we're creating a "default" index.
3944            on.desc(&scx.catalog.resolve_full_name(on.name()))?
3945                .typ()
3946                .default_key()
3947                .iter()
3948                .map(|i| match on_desc.get_unambiguous_name(*i) {
3949                    Some(n) => Expr::Identifier(vec![n.clone().into()]),
3950                    _ => Expr::Value(Value::Number((i + 1).to_string())),
3951                })
3952                .collect()
3953        }
3954    };
3955    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
3956
3957    let index_name = if let Some(name) = name {
3958        QualifiedItemName {
3959            qualifiers: on.name().qualifiers.clone(),
3960            item: normalize::ident(name.clone()),
3961        }
3962    } else {
3963        let mut idx_name = QualifiedItemName {
3964            qualifiers: on.name().qualifiers.clone(),
3965            item: on.name().item.clone(),
3966        };
3967        if key_parts.is_none() {
3968            // We're trying to create the "default" index.
3969            idx_name.item += "_primary_idx";
3970        } else {
3971            // Use PG schema for automatically naming indexes:
3972            // `<table>_<_-separated indexed expressions>_idx`
3973            let index_name_col_suffix = keys
3974                .iter()
3975                .map(|k| match k {
3976                    mz_expr::MirScalarExpr::Column(i) => match on_desc.get_unambiguous_name(*i) {
3977                        Some(col_name) => col_name.to_string(),
3978                        None => format!("{}", i + 1),
3979                    },
3980                    _ => "expr".to_string(),
3981                })
3982                .join("_");
3983            write!(idx_name.item, "_{index_name_col_suffix}_idx")
3984                .expect("write on strings cannot fail");
3985            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
3986        }
3987
3988        if !*if_not_exists {
3989            scx.catalog.find_available_name(idx_name)
3990        } else {
3991            idx_name
3992        }
3993    };
3994
3995    // Check for an object in the catalog with this same name
3996    let full_name = scx.catalog.resolve_full_name(&index_name);
3997    let partial_name = PartialItemName::from(full_name.clone());
3998    // For PostgreSQL compatibility, we need to prevent creating indexes when
3999    // there is an existing object *or* type of the same name.
4000    //
4001    // Technically, we only need to prevent coexistence of indexes and types
4002    // that have an associated relation (record types but not list/map types).
4003    // Enforcing that would be more complicated, though. It's backwards
4004    // compatible to weaken this restriction in the future.
4005    if let (Ok(item), false, false) = (
4006        scx.catalog.resolve_item_or_type(&partial_name),
4007        *if_not_exists,
4008        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4009    ) {
4010        return Err(PlanError::ItemAlreadyExists {
4011            name: full_name.to_string(),
4012            item_type: item.item_type(),
4013        });
4014    }
4015
4016    let options = plan_index_options(scx, with_options.clone())?;
4017    let cluster_id = match in_cluster {
4018        None => scx.resolve_cluster(None)?.id(),
4019        Some(in_cluster) => in_cluster.id,
4020    };
4021
4022    *in_cluster = Some(ResolvedClusterName {
4023        id: cluster_id,
4024        print_name: None,
4025    });
4026
4027    // Normalize `stmt`.
4028    *name = Some(Ident::new(index_name.item.clone())?);
4029    *key_parts = Some(filled_key_parts);
4030    let if_not_exists = *if_not_exists;
4031
4032    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4033    let compaction_window = options.iter().find_map(|o| {
4034        #[allow(irrefutable_let_patterns)]
4035        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4036            Some(lcw.clone())
4037        } else {
4038            None
4039        }
4040    });
4041
4042    Ok(Plan::CreateIndex(CreateIndexPlan {
4043        name: index_name,
4044        index: Index {
4045            create_sql,
4046            on: on.global_id(),
4047            keys,
4048            cluster_id,
4049            compaction_window,
4050        },
4051        if_not_exists,
4052    }))
4053}
4054
4055pub fn describe_create_type(
4056    _: &StatementContext,
4057    _: CreateTypeStatement<Aug>,
4058) -> Result<StatementDesc, PlanError> {
4059    Ok(StatementDesc::new(None))
4060}
4061
4062pub fn plan_create_type(
4063    scx: &StatementContext,
4064    stmt: CreateTypeStatement<Aug>,
4065) -> Result<Plan, PlanError> {
4066    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4067    let CreateTypeStatement { name, as_type, .. } = stmt;
4068
4069    fn validate_data_type(
4070        scx: &StatementContext,
4071        data_type: ResolvedDataType,
4072        as_type: &str,
4073        key: &str,
4074    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4075        let (id, modifiers) = match data_type {
4076            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4077            _ => sql_bail!(
4078                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4079                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4080                as_type,
4081                key,
4082                data_type.human_readable_name(),
4083            ),
4084        };
4085
4086        let item = scx.catalog.get_item(&id);
4087        match item.type_details() {
4088            None => sql_bail!(
4089                "{} must be of class type, but received {} which is of class {}",
4090                key,
4091                scx.catalog.resolve_full_name(item.name()),
4092                item.item_type()
4093            ),
4094            Some(CatalogTypeDetails {
4095                typ: CatalogType::Char,
4096                ..
4097            }) => {
4098                bail_unsupported!("embedding char type in a list or map")
4099            }
4100            _ => {
4101                // Validate that the modifiers are actually valid.
4102                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4103
4104                Ok((id, modifiers))
4105            }
4106        }
4107    }
4108
4109    let inner = match as_type {
4110        CreateTypeAs::List { options } => {
4111            let CreateTypeListOptionExtracted {
4112                element_type,
4113                seen: _,
4114            } = CreateTypeListOptionExtracted::try_from(options)?;
4115            let element_type =
4116                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4117            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4118            CatalogType::List {
4119                element_reference: id,
4120                element_modifiers: modifiers,
4121            }
4122        }
4123        CreateTypeAs::Map { options } => {
4124            let CreateTypeMapOptionExtracted {
4125                key_type,
4126                value_type,
4127                seen: _,
4128            } = CreateTypeMapOptionExtracted::try_from(options)?;
4129            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4130            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4131            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4132            let (value_id, value_modifiers) =
4133                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4134            CatalogType::Map {
4135                key_reference: key_id,
4136                key_modifiers,
4137                value_reference: value_id,
4138                value_modifiers,
4139            }
4140        }
4141        CreateTypeAs::Record { column_defs } => {
4142            let mut fields = vec![];
4143            for column_def in column_defs {
4144                let data_type = column_def.data_type;
4145                let key = ident(column_def.name.clone());
4146                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4147                fields.push(CatalogRecordField {
4148                    name: ColumnName::from(key.clone()),
4149                    type_reference: id,
4150                    type_modifiers: modifiers,
4151                });
4152            }
4153            CatalogType::Record { fields }
4154        }
4155    };
4156
4157    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4158
4159    // Check for an object in the catalog with this same name
4160    let full_name = scx.catalog.resolve_full_name(&name);
4161    let partial_name = PartialItemName::from(full_name.clone());
4162    // For PostgreSQL compatibility, we need to prevent creating types when
4163    // there is an existing object *or* type of the same name.
4164    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4165        if item.item_type().conflicts_with_type() {
4166            return Err(PlanError::ItemAlreadyExists {
4167                name: full_name.to_string(),
4168                item_type: item.item_type(),
4169            });
4170        }
4171    }
4172
4173    Ok(Plan::CreateType(CreateTypePlan {
4174        name,
4175        typ: Type { create_sql, inner },
4176    }))
4177}
4178
4179generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4180
4181generate_extracted_config!(
4182    CreateTypeMapOption,
4183    (KeyType, ResolvedDataType),
4184    (ValueType, ResolvedDataType)
4185);
4186
4187#[derive(Debug)]
4188pub enum PlannedAlterRoleOption {
4189    Attributes(PlannedRoleAttributes),
4190    Variable(PlannedRoleVariable),
4191}
4192
4193#[derive(Debug, Clone)]
4194pub struct PlannedRoleAttributes {
4195    pub inherit: Option<bool>,
4196    pub password: Option<Password>,
4197    /// `nopassword` is set to true if the password is from the parser is None.
4198    /// This is semantically different than not supplying a password at all,
4199    /// to allow for unsetting a password.
4200    pub nopassword: Option<bool>,
4201    pub superuser: Option<bool>,
4202    pub login: Option<bool>,
4203}
4204
4205fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4206    let mut planned_attributes = PlannedRoleAttributes {
4207        inherit: None,
4208        password: None,
4209        superuser: None,
4210        login: None,
4211        nopassword: None,
4212    };
4213
4214    for option in options {
4215        match option {
4216            RoleAttribute::Inherit | RoleAttribute::NoInherit
4217                if planned_attributes.inherit.is_some() =>
4218            {
4219                sql_bail!("conflicting or redundant options");
4220            }
4221            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4222                bail_never_supported!(
4223                    "CREATECLUSTER attribute",
4224                    "sql/create-role/#details",
4225                    "Use system privileges instead."
4226                );
4227            }
4228            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4229                bail_never_supported!(
4230                    "CREATEDB attribute",
4231                    "sql/create-role/#details",
4232                    "Use system privileges instead."
4233                );
4234            }
4235            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4236                bail_never_supported!(
4237                    "CREATEROLE attribute",
4238                    "sql/create-role/#details",
4239                    "Use system privileges instead."
4240                );
4241            }
4242            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4243                sql_bail!("conflicting or redundant options");
4244            }
4245
4246            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4247            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4248            RoleAttribute::Password(password) => {
4249                if let Some(password) = password {
4250                    planned_attributes.password = Some(password.into());
4251                } else {
4252                    planned_attributes.nopassword = Some(true);
4253                }
4254            }
4255            RoleAttribute::SuperUser => {
4256                if planned_attributes.superuser == Some(false) {
4257                    sql_bail!("conflicting or redundant options");
4258                }
4259                planned_attributes.superuser = Some(true);
4260            }
4261            RoleAttribute::NoSuperUser => {
4262                if planned_attributes.superuser == Some(true) {
4263                    sql_bail!("conflicting or redundant options");
4264                }
4265                planned_attributes.superuser = Some(false);
4266            }
4267            RoleAttribute::Login => {
4268                if planned_attributes.login == Some(false) {
4269                    sql_bail!("conflicting or redundant options");
4270                }
4271                planned_attributes.login = Some(true);
4272            }
4273            RoleAttribute::NoLogin => {
4274                if planned_attributes.login == Some(true) {
4275                    sql_bail!("conflicting or redundant options");
4276                }
4277                planned_attributes.login = Some(false);
4278            }
4279        }
4280    }
4281    if planned_attributes.inherit == Some(false) {
4282        bail_unsupported!("non inherit roles");
4283    }
4284
4285    Ok(planned_attributes)
4286}
4287
4288#[derive(Debug)]
4289pub enum PlannedRoleVariable {
4290    Set { name: String, value: VariableValue },
4291    Reset { name: String },
4292}
4293
4294impl PlannedRoleVariable {
4295    pub fn name(&self) -> &str {
4296        match self {
4297            PlannedRoleVariable::Set { name, .. } => name,
4298            PlannedRoleVariable::Reset { name } => name,
4299        }
4300    }
4301}
4302
4303fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4304    let plan = match variable {
4305        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4306            name: name.to_string(),
4307            value: scl::plan_set_variable_to(value)?,
4308        },
4309        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4310            name: name.to_string(),
4311        },
4312    };
4313    Ok(plan)
4314}
4315
4316pub fn describe_create_role(
4317    _: &StatementContext,
4318    _: CreateRoleStatement,
4319) -> Result<StatementDesc, PlanError> {
4320    Ok(StatementDesc::new(None))
4321}
4322
4323pub fn plan_create_role(
4324    _: &StatementContext,
4325    CreateRoleStatement { name, options }: CreateRoleStatement,
4326) -> Result<Plan, PlanError> {
4327    let attributes = plan_role_attributes(options)?;
4328    Ok(Plan::CreateRole(CreateRolePlan {
4329        name: normalize::ident(name),
4330        attributes: attributes.into(),
4331    }))
4332}
4333
4334pub fn plan_create_network_policy(
4335    ctx: &StatementContext,
4336    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4337) -> Result<Plan, PlanError> {
4338    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4339    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4340
4341    let Some(rule_defs) = policy_options.rules else {
4342        sql_bail!("RULES must be specified when creating network policies.");
4343    };
4344
4345    let mut rules = vec![];
4346    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4347        let NetworkPolicyRuleOptionExtracted {
4348            seen: _,
4349            direction,
4350            action,
4351            address,
4352        } = options.try_into()?;
4353        let (direction, action, address) = match (direction, action, address) {
4354            (Some(direction), Some(action), Some(address)) => (
4355                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4356                NetworkPolicyRuleAction::try_from(action.as_str())?,
4357                PolicyAddress::try_from(address.as_str())?,
4358            ),
4359            (_, _, _) => {
4360                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4361            }
4362        };
4363        rules.push(NetworkPolicyRule {
4364            name: normalize::ident(name),
4365            direction,
4366            action,
4367            address,
4368        });
4369    }
4370
4371    if rules.len()
4372        > ctx
4373            .catalog
4374            .system_vars()
4375            .max_rules_per_network_policy()
4376            .try_into()?
4377    {
4378        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4379    }
4380
4381    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4382        name: normalize::ident(name),
4383        rules,
4384    }))
4385}
4386
4387pub fn plan_alter_network_policy(
4388    ctx: &StatementContext,
4389    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4390) -> Result<Plan, PlanError> {
4391    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4392
4393    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4394    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4395
4396    let Some(rule_defs) = policy_options.rules else {
4397        sql_bail!("RULES must be specified when creating network policies.");
4398    };
4399
4400    let mut rules = vec![];
4401    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4402        let NetworkPolicyRuleOptionExtracted {
4403            seen: _,
4404            direction,
4405            action,
4406            address,
4407        } = options.try_into()?;
4408
4409        let (direction, action, address) = match (direction, action, address) {
4410            (Some(direction), Some(action), Some(address)) => (
4411                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4412                NetworkPolicyRuleAction::try_from(action.as_str())?,
4413                PolicyAddress::try_from(address.as_str())?,
4414            ),
4415            (_, _, _) => {
4416                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4417            }
4418        };
4419        rules.push(NetworkPolicyRule {
4420            name: normalize::ident(name),
4421            direction,
4422            action,
4423            address,
4424        });
4425    }
4426    if rules.len()
4427        > ctx
4428            .catalog
4429            .system_vars()
4430            .max_rules_per_network_policy()
4431            .try_into()?
4432    {
4433        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4434    }
4435
4436    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4437        id: policy.id(),
4438        name: normalize::ident(name),
4439        rules,
4440    }))
4441}
4442
4443pub fn describe_create_cluster(
4444    _: &StatementContext,
4445    _: CreateClusterStatement<Aug>,
4446) -> Result<StatementDesc, PlanError> {
4447    Ok(StatementDesc::new(None))
4448}
4449
4450// WARNING:
4451// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4452// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4453// that option stays the same. If you were to give a default value here, then not giving that option
4454// to ALTER CLUSTER would always reset the value of that option to the default.
4455generate_extracted_config!(
4456    ClusterOption,
4457    (AvailabilityZones, Vec<String>),
4458    (Disk, bool),
4459    (IntrospectionDebugging, bool),
4460    (IntrospectionInterval, OptionalDuration),
4461    (Managed, bool),
4462    (Replicas, Vec<ReplicaDefinition<Aug>>),
4463    (ReplicationFactor, u32),
4464    (Size, String),
4465    (Schedule, ClusterScheduleOptionValue),
4466    (WorkloadClass, OptionalString)
4467);
4468
4469generate_extracted_config!(
4470    NetworkPolicyOption,
4471    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4472);
4473
4474generate_extracted_config!(
4475    NetworkPolicyRuleOption,
4476    (Direction, String),
4477    (Action, String),
4478    (Address, String)
4479);
4480
4481generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4482
4483generate_extracted_config!(
4484    ClusterAlterUntilReadyOption,
4485    (Timeout, Duration),
4486    (OnTimeout, String)
4487);
4488
4489generate_extracted_config!(
4490    ClusterFeature,
4491    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4492    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4493    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4494    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4495    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4496    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4497    (
4498        EnableProjectionPushdownAfterRelationCse,
4499        Option<bool>,
4500        Default(None)
4501    )
4502);
4503
4504/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4505///
4506/// The reverse of [`unplan_create_cluster`].
4507pub fn plan_create_cluster(
4508    scx: &StatementContext,
4509    stmt: CreateClusterStatement<Aug>,
4510) -> Result<Plan, PlanError> {
4511    let plan = plan_create_cluster_inner(scx, stmt)?;
4512
4513    // Roundtrip through unplan and make sure that we end up with the same plan.
4514    if let CreateClusterVariant::Managed(_) = &plan.variant {
4515        let stmt = unplan_create_cluster(scx, plan.clone())
4516            .map_err(|e| PlanError::Replan(e.to_string()))?;
4517        let create_sql = stmt.to_ast_string_stable();
4518        let stmt = parse::parse(&create_sql)
4519            .map_err(|e| PlanError::Replan(e.to_string()))?
4520            .into_element()
4521            .ast;
4522        let (stmt, _resolved_ids) =
4523            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4524        let stmt = match stmt {
4525            Statement::CreateCluster(stmt) => stmt,
4526            stmt => {
4527                return Err(PlanError::Replan(format!(
4528                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4529                )));
4530            }
4531        };
4532        let replan =
4533            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4534        if plan != replan {
4535            return Err(PlanError::Replan(format!(
4536                "replan does not match: plan={plan:?}, replan={replan:?}"
4537            )));
4538        }
4539    }
4540
4541    Ok(Plan::CreateCluster(plan))
4542}
4543
4544pub fn plan_create_cluster_inner(
4545    scx: &StatementContext,
4546    CreateClusterStatement {
4547        name,
4548        options,
4549        features,
4550    }: CreateClusterStatement<Aug>,
4551) -> Result<CreateClusterPlan, PlanError> {
4552    let ClusterOptionExtracted {
4553        availability_zones,
4554        introspection_debugging,
4555        introspection_interval,
4556        managed,
4557        replicas,
4558        replication_factor,
4559        seen: _,
4560        size,
4561        disk: disk_in,
4562        schedule,
4563        workload_class,
4564    }: ClusterOptionExtracted = options.try_into()?;
4565
4566    let managed = managed.unwrap_or_else(|| replicas.is_none());
4567
4568    if !scx.catalog.active_role_id().is_system() {
4569        if !features.is_empty() {
4570            sql_bail!("FEATURES not supported for non-system users");
4571        }
4572        if workload_class.is_some() {
4573            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4574        }
4575    }
4576
4577    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4578    let workload_class = workload_class.and_then(|v| v.0);
4579
4580    if managed {
4581        if replicas.is_some() {
4582            sql_bail!("REPLICAS not supported for managed clusters");
4583        }
4584        let Some(size) = size else {
4585            sql_bail!("SIZE must be specified for managed clusters");
4586        };
4587
4588        let mut disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4589        // HACK(benesch): disk is always enabled for v2 cluster sizes, and it
4590        // is an error to specify `DISK = FALSE` or `DISK = TRUE` explicitly.
4591        //
4592        // The long term plan is to phase out the v1 cluster sizes, at which
4593        // point we'll be able to remove the `DISK` option entirely and simply
4594        // always enable disk.
4595        if scx.catalog.is_cluster_size_cc(&size) {
4596            if disk_in == Some(false) {
4597                sql_bail!(
4598                    "DISK option disabled is not supported for non-legacy cluster sizes because disk is always enabled"
4599                );
4600            }
4601            disk_default = true;
4602        }
4603        // Only require the feature flag if `DISK` was explicitly specified and it does not match
4604        // the default value.
4605        if matches!(disk_in, Some(disk) if disk != disk_default) {
4606            scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4607        }
4608        let disk = disk_in.unwrap_or(disk_default);
4609
4610        let compute = plan_compute_replica_config(
4611            introspection_interval,
4612            introspection_debugging.unwrap_or(false),
4613        )?;
4614
4615        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4616            replication_factor.unwrap_or_else(|| {
4617                scx.catalog
4618                    .system_vars()
4619                    .default_cluster_replication_factor()
4620            })
4621        } else {
4622            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4623            if replication_factor.is_some() {
4624                sql_bail!(
4625                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4626                );
4627            }
4628            // If we have a non-trivial schedule, then let's not have any replicas initially,
4629            // to avoid quickly going back and forth if the schedule doesn't want a replica
4630            // initially.
4631            0
4632        };
4633        let availability_zones = availability_zones.unwrap_or_default();
4634
4635        if !availability_zones.is_empty() {
4636            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4637        }
4638
4639        // Plan OptimizerFeatureOverrides.
4640        let ClusterFeatureExtracted {
4641            reoptimize_imported_views,
4642            enable_eager_delta_joins,
4643            enable_new_outer_join_lowering,
4644            enable_variadic_left_join_lowering,
4645            enable_letrec_fixpoint_analysis,
4646            enable_join_prioritize_arranged,
4647            enable_projection_pushdown_after_relation_cse,
4648            seen: _,
4649        } = ClusterFeatureExtracted::try_from(features)?;
4650        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4651            reoptimize_imported_views,
4652            enable_eager_delta_joins,
4653            enable_new_outer_join_lowering,
4654            enable_variadic_left_join_lowering,
4655            enable_letrec_fixpoint_analysis,
4656            enable_join_prioritize_arranged,
4657            enable_projection_pushdown_after_relation_cse,
4658            ..Default::default()
4659        };
4660
4661        let schedule = plan_cluster_schedule(schedule)?;
4662
4663        Ok(CreateClusterPlan {
4664            name: normalize::ident(name),
4665            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4666                replication_factor,
4667                size,
4668                availability_zones,
4669                compute,
4670                disk,
4671                optimizer_feature_overrides,
4672                schedule,
4673            }),
4674            workload_class,
4675        })
4676    } else {
4677        let Some(replica_defs) = replicas else {
4678            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4679        };
4680        if availability_zones.is_some() {
4681            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4682        }
4683        if replication_factor.is_some() {
4684            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4685        }
4686        if introspection_debugging.is_some() {
4687            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4688        }
4689        if introspection_interval.is_some() {
4690            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4691        }
4692        if size.is_some() {
4693            sql_bail!("SIZE not supported for unmanaged clusters");
4694        }
4695        if disk_in.is_some() {
4696            sql_bail!("DISK not supported for unmanaged clusters");
4697        }
4698        if !features.is_empty() {
4699            sql_bail!("FEATURES not supported for unmanaged clusters");
4700        }
4701        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4702            sql_bail!(
4703                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4704            );
4705        }
4706
4707        let mut replicas = vec![];
4708        for ReplicaDefinition { name, options } in replica_defs {
4709            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4710        }
4711
4712        Ok(CreateClusterPlan {
4713            name: normalize::ident(name),
4714            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4715            workload_class,
4716        })
4717    }
4718}
4719
4720/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4721///
4722/// The reverse of [`plan_create_cluster`].
4723pub fn unplan_create_cluster(
4724    scx: &StatementContext,
4725    CreateClusterPlan {
4726        name,
4727        variant,
4728        workload_class,
4729    }: CreateClusterPlan,
4730) -> Result<CreateClusterStatement<Aug>, PlanError> {
4731    match variant {
4732        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4733            replication_factor,
4734            size,
4735            availability_zones,
4736            compute,
4737            disk,
4738            optimizer_feature_overrides,
4739            schedule,
4740        }) => {
4741            let schedule = unplan_cluster_schedule(schedule);
4742            let OptimizerFeatureOverrides {
4743                enable_consolidate_after_union_negate: _,
4744                enable_reduce_mfp_fusion: _,
4745                enable_cardinality_estimates: _,
4746                persist_fast_path_limit: _,
4747                reoptimize_imported_views,
4748                enable_eager_delta_joins,
4749                enable_new_outer_join_lowering,
4750                enable_variadic_left_join_lowering,
4751                enable_letrec_fixpoint_analysis,
4752                enable_reduce_reduction: _,
4753                enable_join_prioritize_arranged,
4754                enable_projection_pushdown_after_relation_cse,
4755                enable_less_reduce_in_eqprop: _,
4756                enable_dequadratic_eqprop_map: _,
4757            } = optimizer_feature_overrides;
4758            // The ones from above that don't occur below are not wired up to cluster features.
4759            let features_extracted = ClusterFeatureExtracted {
4760                // Seen is ignored when unplanning.
4761                seen: Default::default(),
4762                reoptimize_imported_views,
4763                enable_eager_delta_joins,
4764                enable_new_outer_join_lowering,
4765                enable_variadic_left_join_lowering,
4766                enable_letrec_fixpoint_analysis,
4767                enable_join_prioritize_arranged,
4768                enable_projection_pushdown_after_relation_cse,
4769            };
4770            let features = features_extracted.into_values(scx.catalog);
4771            let availability_zones = if availability_zones.is_empty() {
4772                None
4773            } else {
4774                Some(availability_zones)
4775            };
4776            let (introspection_interval, introspection_debugging) =
4777                unplan_compute_replica_config(compute);
4778            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4779            // always 1 or less.
4780            let replication_factor = match &schedule {
4781                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4782                ClusterScheduleOptionValue::Refresh { .. } => {
4783                    assert!(
4784                        replication_factor <= 1,
4785                        "replication factor, {replication_factor:?}, must be <= 1"
4786                    );
4787                    None
4788                }
4789            };
4790            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4791            let options_extracted = ClusterOptionExtracted {
4792                // Seen is ignored when unplanning.
4793                seen: Default::default(),
4794                availability_zones,
4795                disk: Some(disk),
4796                introspection_debugging: Some(introspection_debugging),
4797                introspection_interval,
4798                managed: Some(true),
4799                replicas: None,
4800                replication_factor,
4801                size: Some(size),
4802                schedule: Some(schedule),
4803                workload_class,
4804            };
4805            let options = options_extracted.into_values(scx.catalog);
4806            let name = Ident::new_unchecked(name);
4807            Ok(CreateClusterStatement {
4808                name,
4809                options,
4810                features,
4811            })
4812        }
4813        CreateClusterVariant::Unmanaged(_) => {
4814            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4815        }
4816    }
4817}
4818
4819generate_extracted_config!(
4820    ReplicaOption,
4821    (AvailabilityZone, String),
4822    (BilledAs, String),
4823    (ComputeAddresses, Vec<String>),
4824    (ComputectlAddresses, Vec<String>),
4825    (Disk, bool),
4826    (Internal, bool, Default(false)),
4827    (IntrospectionDebugging, bool, Default(false)),
4828    (IntrospectionInterval, OptionalDuration),
4829    (Size, String),
4830    (StorageAddresses, Vec<String>),
4831    (StoragectlAddresses, Vec<String>),
4832    (Workers, u16)
4833);
4834
4835fn plan_replica_config(
4836    scx: &StatementContext,
4837    options: Vec<ReplicaOption<Aug>>,
4838) -> Result<ReplicaConfig, PlanError> {
4839    let ReplicaOptionExtracted {
4840        availability_zone,
4841        billed_as,
4842        compute_addresses,
4843        computectl_addresses,
4844        disk: disk_in,
4845        internal,
4846        introspection_debugging,
4847        introspection_interval,
4848        size,
4849        storage_addresses,
4850        storagectl_addresses,
4851        workers,
4852        ..
4853    }: ReplicaOptionExtracted = options.try_into()?;
4854
4855    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4856
4857    if disk_in.is_some() {
4858        scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4859    }
4860
4861    match (
4862        size,
4863        availability_zone,
4864        billed_as,
4865        storagectl_addresses,
4866        storage_addresses,
4867        computectl_addresses,
4868        compute_addresses,
4869        workers,
4870    ) {
4871        // Common cases we expect end users to hit.
4872        (None, _, None, None, None, None, None, None) => {
4873            // We don't mention the unmanaged options in the error message
4874            // because they are only available in unsafe mode.
4875            sql_bail!("SIZE option must be specified");
4876        }
4877        (Some(size), availability_zone, billed_as, None, None, None, None, None) => {
4878            let disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4879            let mut disk = disk_in.unwrap_or(disk_default);
4880
4881            // HACK(benesch): disk is always enabled for v2 cluster sizes, and
4882            // it is an error to specify `DISK = FALSE` or `DISK = TRUE`
4883            // explicitly.
4884            //
4885            // The long term plan is to phase out the v1 cluster sizes, at which
4886            // point we'll be able to remove the `DISK` option entirely and
4887            // simply always enable disk.
4888            if scx.catalog.is_cluster_size_cc(&size) {
4889                if disk_in.is_some() {
4890                    sql_bail!(
4891                        "DISK option not supported for non-legacy cluster sizes because disk is always enabled"
4892                    );
4893                }
4894                disk = true;
4895            }
4896
4897            Ok(ReplicaConfig::Orchestrated {
4898                size,
4899                availability_zone,
4900                compute,
4901                disk,
4902                billed_as,
4903                internal,
4904            })
4905        }
4906
4907        (
4908            None,
4909            None,
4910            None,
4911            storagectl_addresses,
4912            storage_addresses,
4913            computectl_addresses,
4914            compute_addresses,
4915            workers,
4916        ) => {
4917            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4918
4919            // When manually testing Materialize in unsafe mode, it's easy to
4920            // accidentally omit one of these options, so we try to produce
4921            // helpful error messages.
4922            let Some(storagectl_addrs) = storagectl_addresses else {
4923                sql_bail!("missing STORAGECTL ADDRESSES option");
4924            };
4925            let Some(storage_addrs) = storage_addresses else {
4926                sql_bail!("missing STORAGE ADDRESSES option");
4927            };
4928            let Some(computectl_addrs) = computectl_addresses else {
4929                sql_bail!("missing COMPUTECTL ADDRESSES option");
4930            };
4931            let Some(compute_addrs) = compute_addresses else {
4932                sql_bail!("missing COMPUTE ADDRESSES option");
4933            };
4934            let workers = workers.unwrap_or(1);
4935
4936            if computectl_addrs.len() != compute_addrs.len() {
4937                sql_bail!("COMPUTECTL ADDRESSES and COMPUTE ADDRESSES must have the same length");
4938            }
4939            if storagectl_addrs.len() != storage_addrs.len() {
4940                sql_bail!("STORAGECTL ADDRESSES and STORAGE ADDRESSES must have the same length");
4941            }
4942            if storagectl_addrs.len() != computectl_addrs.len() {
4943                sql_bail!(
4944                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4945                );
4946            }
4947
4948            if workers == 0 {
4949                sql_bail!("WORKERS must be greater than 0");
4950            }
4951
4952            if disk_in.is_some() {
4953                sql_bail!("DISK can't be specified for unorchestrated clusters");
4954            }
4955
4956            Ok(ReplicaConfig::Unorchestrated {
4957                storagectl_addrs,
4958                storage_addrs,
4959                computectl_addrs,
4960                compute_addrs,
4961                workers: workers.into(),
4962                compute,
4963            })
4964        }
4965        _ => {
4966            // We don't bother trying to produce a more helpful error message
4967            // here because no user is likely to hit this path.
4968            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4969        }
4970    }
4971}
4972
4973/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
4974///
4975/// The reverse of [`unplan_compute_replica_config`].
4976fn plan_compute_replica_config(
4977    introspection_interval: Option<OptionalDuration>,
4978    introspection_debugging: bool,
4979) -> Result<ComputeReplicaConfig, PlanError> {
4980    let introspection_interval = introspection_interval
4981        .map(|OptionalDuration(i)| i)
4982        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
4983    let introspection = match introspection_interval {
4984        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
4985            interval,
4986            debugging: introspection_debugging,
4987        }),
4988        None if introspection_debugging => {
4989            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
4990        }
4991        None => None,
4992    };
4993    let compute = ComputeReplicaConfig { introspection };
4994    Ok(compute)
4995}
4996
4997/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
4998///
4999/// The reverse of [`plan_compute_replica_config`].
5000fn unplan_compute_replica_config(
5001    compute_replica_config: ComputeReplicaConfig,
5002) -> (Option<OptionalDuration>, bool) {
5003    match compute_replica_config.introspection {
5004        Some(ComputeReplicaIntrospectionConfig {
5005            debugging,
5006            interval,
5007        }) => (Some(OptionalDuration(Some(interval))), debugging),
5008        None => (Some(OptionalDuration(None)), false),
5009    }
5010}
5011
5012/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5013///
5014/// The reverse of [`unplan_cluster_schedule`].
5015fn plan_cluster_schedule(
5016    schedule: ClusterScheduleOptionValue,
5017) -> Result<ClusterSchedule, PlanError> {
5018    Ok(match schedule {
5019        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5020        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5021        ClusterScheduleOptionValue::Refresh {
5022            hydration_time_estimate: None,
5023        } => ClusterSchedule::Refresh {
5024            hydration_time_estimate: Duration::from_millis(0),
5025        },
5026        // Otherwise we convert the `IntervalValue` to a `Duration`.
5027        ClusterScheduleOptionValue::Refresh {
5028            hydration_time_estimate: Some(interval_value),
5029        } => {
5030            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5031            if interval.as_microseconds() < 0 {
5032                sql_bail!(
5033                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5034                    interval
5035                );
5036            }
5037            if interval.months != 0 {
5038                // This limitation is because we want this interval to be cleanly convertable
5039                // to a unix epoch timestamp difference. When the interval involves months, then
5040                // this is not true anymore, because months have variable lengths.
5041                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5042            }
5043            let duration = interval.duration()?;
5044            if u64::try_from(duration.as_millis()).is_err()
5045                || Interval::from_duration(&duration).is_err()
5046            {
5047                sql_bail!("HYDRATION TIME ESTIMATE too large");
5048            }
5049            ClusterSchedule::Refresh {
5050                hydration_time_estimate: duration,
5051            }
5052        }
5053    })
5054}
5055
5056/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5057///
5058/// The reverse of [`plan_cluster_schedule`].
5059fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5060    match schedule {
5061        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5062        ClusterSchedule::Refresh {
5063            hydration_time_estimate,
5064        } => {
5065            let interval = Interval::from_duration(&hydration_time_estimate)
5066                .expect("planning ensured that this is convertible back to Interval");
5067            let interval_value = literal::unplan_interval(&interval);
5068            ClusterScheduleOptionValue::Refresh {
5069                hydration_time_estimate: Some(interval_value),
5070            }
5071        }
5072    }
5073}
5074
5075pub fn describe_create_cluster_replica(
5076    _: &StatementContext,
5077    _: CreateClusterReplicaStatement<Aug>,
5078) -> Result<StatementDesc, PlanError> {
5079    Ok(StatementDesc::new(None))
5080}
5081
5082pub fn plan_create_cluster_replica(
5083    scx: &StatementContext,
5084    CreateClusterReplicaStatement {
5085        definition: ReplicaDefinition { name, options },
5086        of_cluster,
5087    }: CreateClusterReplicaStatement<Aug>,
5088) -> Result<Plan, PlanError> {
5089    let cluster = scx
5090        .catalog
5091        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5092    let current_replica_count = cluster.replica_ids().iter().count();
5093    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5094        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5095        return Err(PlanError::CreateReplicaFailStorageObjects {
5096            current_replica_count,
5097            internal_replica_count,
5098            hypothetical_replica_count: current_replica_count + 1,
5099        });
5100    }
5101
5102    let config = plan_replica_config(scx, options)?;
5103
5104    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5105        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5106            return Err(PlanError::MangedReplicaName(name.into_string()));
5107        }
5108    } else {
5109        ensure_cluster_is_not_managed(scx, cluster.id())?;
5110    }
5111
5112    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5113        name: normalize::ident(name),
5114        cluster_id: cluster.id(),
5115        config,
5116    }))
5117}
5118
5119pub fn describe_create_secret(
5120    _: &StatementContext,
5121    _: CreateSecretStatement<Aug>,
5122) -> Result<StatementDesc, PlanError> {
5123    Ok(StatementDesc::new(None))
5124}
5125
5126pub fn plan_create_secret(
5127    scx: &StatementContext,
5128    stmt: CreateSecretStatement<Aug>,
5129) -> Result<Plan, PlanError> {
5130    let CreateSecretStatement {
5131        name,
5132        if_not_exists,
5133        value,
5134    } = &stmt;
5135
5136    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5137    let mut create_sql_statement = stmt.clone();
5138    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5139    let create_sql =
5140        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5141    let secret_as = query::plan_secret_as(scx, value.clone())?;
5142
5143    let secret = Secret {
5144        create_sql,
5145        secret_as,
5146    };
5147
5148    Ok(Plan::CreateSecret(CreateSecretPlan {
5149        name,
5150        secret,
5151        if_not_exists: *if_not_exists,
5152    }))
5153}
5154
5155pub fn describe_create_connection(
5156    _: &StatementContext,
5157    _: CreateConnectionStatement<Aug>,
5158) -> Result<StatementDesc, PlanError> {
5159    Ok(StatementDesc::new(None))
5160}
5161
5162generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5163
5164pub fn plan_create_connection(
5165    scx: &StatementContext,
5166    mut stmt: CreateConnectionStatement<Aug>,
5167) -> Result<Plan, PlanError> {
5168    let CreateConnectionStatement {
5169        name,
5170        connection_type,
5171        values,
5172        if_not_exists,
5173        with_options,
5174    } = stmt.clone();
5175    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5176    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5177    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5178
5179    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5180    if options.validate.is_some() {
5181        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5182    }
5183    let validate = match options.validate {
5184        Some(val) => val,
5185        None => {
5186            scx.catalog
5187                .system_vars()
5188                .enable_default_connection_validation()
5189                && details.to_connection().validate_by_default()
5190        }
5191    };
5192
5193    // Check for an object in the catalog with this same name
5194    let full_name = scx.catalog.resolve_full_name(&name);
5195    let partial_name = PartialItemName::from(full_name.clone());
5196    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5197        return Err(PlanError::ItemAlreadyExists {
5198            name: full_name.to_string(),
5199            item_type: item.item_type(),
5200        });
5201    }
5202
5203    // For SSH connections, overwrite the public key options based on the
5204    // connection details, in case we generated new keys during planning.
5205    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5206        stmt.values.retain(|v| {
5207            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5208        });
5209        stmt.values.push(ConnectionOption {
5210            name: ConnectionOptionName::PublicKey1,
5211            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5212        });
5213        stmt.values.push(ConnectionOption {
5214            name: ConnectionOptionName::PublicKey2,
5215            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5216        });
5217    }
5218    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5219
5220    let plan = CreateConnectionPlan {
5221        name,
5222        if_not_exists,
5223        connection: crate::plan::Connection {
5224            create_sql,
5225            details,
5226        },
5227        validate,
5228    };
5229    Ok(Plan::CreateConnection(plan))
5230}
5231
5232fn plan_drop_database(
5233    scx: &StatementContext,
5234    if_exists: bool,
5235    name: &UnresolvedDatabaseName,
5236    cascade: bool,
5237) -> Result<Option<DatabaseId>, PlanError> {
5238    Ok(match resolve_database(scx, name, if_exists)? {
5239        Some(database) => {
5240            if !cascade && database.has_schemas() {
5241                sql_bail!(
5242                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5243                    name,
5244                );
5245            }
5246            Some(database.id())
5247        }
5248        None => None,
5249    })
5250}
5251
5252pub fn describe_drop_objects(
5253    _: &StatementContext,
5254    _: DropObjectsStatement,
5255) -> Result<StatementDesc, PlanError> {
5256    Ok(StatementDesc::new(None))
5257}
5258
5259pub fn plan_drop_objects(
5260    scx: &mut StatementContext,
5261    DropObjectsStatement {
5262        object_type,
5263        if_exists,
5264        names,
5265        cascade,
5266    }: DropObjectsStatement,
5267) -> Result<Plan, PlanError> {
5268    assert_ne!(
5269        object_type,
5270        mz_sql_parser::ast::ObjectType::Func,
5271        "rejected in parser"
5272    );
5273    let object_type = object_type.into();
5274
5275    let mut referenced_ids = Vec::new();
5276    for name in names {
5277        let id = match &name {
5278            UnresolvedObjectName::Cluster(name) => {
5279                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5280            }
5281            UnresolvedObjectName::ClusterReplica(name) => {
5282                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5283            }
5284            UnresolvedObjectName::Database(name) => {
5285                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5286            }
5287            UnresolvedObjectName::Schema(name) => {
5288                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5289            }
5290            UnresolvedObjectName::Role(name) => {
5291                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5292            }
5293            UnresolvedObjectName::Item(name) => {
5294                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5295                    .map(ObjectId::Item)
5296            }
5297            UnresolvedObjectName::NetworkPolicy(name) => {
5298                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5299            }
5300        };
5301        match id {
5302            Some(id) => referenced_ids.push(id),
5303            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5304                name: name.to_ast_string_simple(),
5305                object_type,
5306            }),
5307        }
5308    }
5309    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5310
5311    Ok(Plan::DropObjects(DropObjectsPlan {
5312        referenced_ids,
5313        drop_ids,
5314        object_type,
5315    }))
5316}
5317
5318fn plan_drop_schema(
5319    scx: &StatementContext,
5320    if_exists: bool,
5321    name: &UnresolvedSchemaName,
5322    cascade: bool,
5323) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5324    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5325        Some((database_spec, schema_spec)) => {
5326            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5327                sql_bail!(
5328                    "cannot drop schema {name} because it is required by the database system",
5329                );
5330            }
5331            if let SchemaSpecifier::Temporary = schema_spec {
5332                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5333            }
5334            let schema = scx.get_schema(&database_spec, &schema_spec);
5335            if !cascade && schema.has_items() {
5336                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5337                sql_bail!(
5338                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5339                    full_schema_name
5340                );
5341            }
5342            Some((database_spec, schema_spec))
5343        }
5344        None => None,
5345    })
5346}
5347
5348fn plan_drop_role(
5349    scx: &StatementContext,
5350    if_exists: bool,
5351    name: &Ident,
5352) -> Result<Option<RoleId>, PlanError> {
5353    match scx.catalog.resolve_role(name.as_str()) {
5354        Ok(role) => {
5355            let id = role.id();
5356            if &id == scx.catalog.active_role_id() {
5357                sql_bail!("current role cannot be dropped");
5358            }
5359            for role in scx.catalog.get_roles() {
5360                for (member_id, grantor_id) in role.membership() {
5361                    if &id == grantor_id {
5362                        let member_role = scx.catalog.get_role(member_id);
5363                        sql_bail!(
5364                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5365                            name.as_str(),
5366                            role.name(),
5367                            member_role.name()
5368                        );
5369                    }
5370                }
5371            }
5372            Ok(Some(role.id()))
5373        }
5374        Err(_) if if_exists => Ok(None),
5375        Err(e) => Err(e.into()),
5376    }
5377}
5378
5379fn plan_drop_cluster(
5380    scx: &StatementContext,
5381    if_exists: bool,
5382    name: &Ident,
5383    cascade: bool,
5384) -> Result<Option<ClusterId>, PlanError> {
5385    Ok(match resolve_cluster(scx, name, if_exists)? {
5386        Some(cluster) => {
5387            if !cascade && !cluster.bound_objects().is_empty() {
5388                return Err(PlanError::DependentObjectsStillExist {
5389                    object_type: "cluster".to_string(),
5390                    object_name: cluster.name().to_string(),
5391                    dependents: Vec::new(),
5392                });
5393            }
5394            Some(cluster.id())
5395        }
5396        None => None,
5397    })
5398}
5399
5400fn plan_drop_network_policy(
5401    scx: &StatementContext,
5402    if_exists: bool,
5403    name: &Ident,
5404) -> Result<Option<NetworkPolicyId>, PlanError> {
5405    match scx.catalog.resolve_network_policy(name.as_str()) {
5406        Ok(policy) => {
5407            // TODO(network_policy): When we support role based network policies, check if any role
5408            // currently has the specified policy set.
5409            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5410                Err(PlanError::NetworkPolicyInUse)
5411            } else {
5412                Ok(Some(policy.id()))
5413            }
5414        }
5415        Err(_) if if_exists => Ok(None),
5416        Err(e) => Err(e.into()),
5417    }
5418}
5419
5420/// Returns `true` if the cluster has any object that requires a single replica.
5421/// Returns `false` if the cluster has no objects.
5422fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5423    // If this feature is enabled then all objects support multiple-replicas
5424    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5425        false
5426    } else {
5427        // Othewise we check for the existence of sources or sinks
5428        cluster.bound_objects().iter().any(|id| {
5429            let item = scx.catalog.get_item(id);
5430            matches!(
5431                item.item_type(),
5432                CatalogItemType::Sink | CatalogItemType::Source
5433            )
5434        })
5435    }
5436}
5437
5438fn plan_drop_cluster_replica(
5439    scx: &StatementContext,
5440    if_exists: bool,
5441    name: &QualifiedReplica,
5442) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5443    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5444    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5445}
5446
5447/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5448fn plan_drop_item(
5449    scx: &StatementContext,
5450    object_type: ObjectType,
5451    if_exists: bool,
5452    name: UnresolvedItemName,
5453    cascade: bool,
5454) -> Result<Option<CatalogItemId>, PlanError> {
5455    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5456        Ok(r) => r,
5457        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5458        Err(PlanError::MismatchedObjectType {
5459            name,
5460            is_type: ObjectType::MaterializedView,
5461            expected_type: ObjectType::View,
5462        }) => {
5463            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5464        }
5465        e => e?,
5466    };
5467
5468    Ok(match resolved {
5469        Some(catalog_item) => {
5470            if catalog_item.id().is_system() {
5471                sql_bail!(
5472                    "cannot drop {} {} because it is required by the database system",
5473                    catalog_item.item_type(),
5474                    scx.catalog.minimal_qualification(catalog_item.name()),
5475                );
5476            }
5477
5478            if !cascade {
5479                for id in catalog_item.used_by() {
5480                    let dep = scx.catalog.get_item(id);
5481                    if dependency_prevents_drop(object_type, dep) {
5482                        return Err(PlanError::DependentObjectsStillExist {
5483                            object_type: catalog_item.item_type().to_string(),
5484                            object_name: scx
5485                                .catalog
5486                                .minimal_qualification(catalog_item.name())
5487                                .to_string(),
5488                            dependents: vec![(
5489                                dep.item_type().to_string(),
5490                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5491                            )],
5492                        });
5493                    }
5494                }
5495                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5496                //  relies on entry. Unfortunately, we don't have that information readily available.
5497            }
5498            Some(catalog_item.id())
5499        }
5500        None => None,
5501    })
5502}
5503
5504/// Does the dependency `dep` prevent a drop of a non-cascade query?
5505fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5506    match object_type {
5507        ObjectType::Type => true,
5508        _ => match dep.item_type() {
5509            CatalogItemType::Func
5510            | CatalogItemType::Table
5511            | CatalogItemType::Source
5512            | CatalogItemType::View
5513            | CatalogItemType::MaterializedView
5514            | CatalogItemType::Sink
5515            | CatalogItemType::Type
5516            | CatalogItemType::Secret
5517            | CatalogItemType::Connection
5518            | CatalogItemType::ContinualTask => true,
5519            CatalogItemType::Index => false,
5520        },
5521    }
5522}
5523
5524pub fn describe_alter_index_options(
5525    _: &StatementContext,
5526    _: AlterIndexStatement<Aug>,
5527) -> Result<StatementDesc, PlanError> {
5528    Ok(StatementDesc::new(None))
5529}
5530
5531pub fn describe_drop_owned(
5532    _: &StatementContext,
5533    _: DropOwnedStatement<Aug>,
5534) -> Result<StatementDesc, PlanError> {
5535    Ok(StatementDesc::new(None))
5536}
5537
5538pub fn plan_drop_owned(
5539    scx: &StatementContext,
5540    drop: DropOwnedStatement<Aug>,
5541) -> Result<Plan, PlanError> {
5542    let cascade = drop.cascade();
5543    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5544    let mut drop_ids = Vec::new();
5545    let mut privilege_revokes = Vec::new();
5546    let mut default_privilege_revokes = Vec::new();
5547
5548    fn update_privilege_revokes(
5549        object_id: SystemObjectId,
5550        privileges: &PrivilegeMap,
5551        role_ids: &BTreeSet<RoleId>,
5552        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5553    ) {
5554        privilege_revokes.extend(iter::zip(
5555            iter::repeat(object_id),
5556            privileges
5557                .all_values()
5558                .filter(|privilege| role_ids.contains(&privilege.grantee))
5559                .cloned(),
5560        ));
5561    }
5562
5563    // Replicas
5564    for replica in scx.catalog.get_cluster_replicas() {
5565        if role_ids.contains(&replica.owner_id()) {
5566            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5567        }
5568    }
5569
5570    // Clusters
5571    for cluster in scx.catalog.get_clusters() {
5572        if role_ids.contains(&cluster.owner_id()) {
5573            // Note: CASCADE is not required for replicas.
5574            if !cascade {
5575                let non_owned_bound_objects: Vec<_> = cluster
5576                    .bound_objects()
5577                    .into_iter()
5578                    .map(|item_id| scx.catalog.get_item(item_id))
5579                    .filter(|item| !role_ids.contains(&item.owner_id()))
5580                    .collect();
5581                if !non_owned_bound_objects.is_empty() {
5582                    let names: Vec<_> = non_owned_bound_objects
5583                        .into_iter()
5584                        .map(|item| {
5585                            (
5586                                item.item_type().to_string(),
5587                                scx.catalog.resolve_full_name(item.name()).to_string(),
5588                            )
5589                        })
5590                        .collect();
5591                    return Err(PlanError::DependentObjectsStillExist {
5592                        object_type: "cluster".to_string(),
5593                        object_name: cluster.name().to_string(),
5594                        dependents: names,
5595                    });
5596                }
5597            }
5598            drop_ids.push(cluster.id().into());
5599        }
5600        update_privilege_revokes(
5601            SystemObjectId::Object(cluster.id().into()),
5602            cluster.privileges(),
5603            &role_ids,
5604            &mut privilege_revokes,
5605        );
5606    }
5607
5608    // Items
5609    for item in scx.catalog.get_items() {
5610        if role_ids.contains(&item.owner_id()) {
5611            if !cascade {
5612                // Checks if any items still depend on this one, returning an error if so.
5613                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5614                    let non_owned_dependencies: Vec<_> = used_by
5615                        .into_iter()
5616                        .map(|item_id| scx.catalog.get_item(item_id))
5617                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5618                        .filter(|item| !role_ids.contains(&item.owner_id()))
5619                        .collect();
5620                    if !non_owned_dependencies.is_empty() {
5621                        let names: Vec<_> = non_owned_dependencies
5622                            .into_iter()
5623                            .map(|item| {
5624                                let item_typ = item.item_type().to_string();
5625                                let item_name =
5626                                    scx.catalog.resolve_full_name(item.name()).to_string();
5627                                (item_typ, item_name)
5628                            })
5629                            .collect();
5630                        Err(PlanError::DependentObjectsStillExist {
5631                            object_type: item.item_type().to_string(),
5632                            object_name: scx
5633                                .catalog
5634                                .resolve_full_name(item.name())
5635                                .to_string()
5636                                .to_string(),
5637                            dependents: names,
5638                        })
5639                    } else {
5640                        Ok(())
5641                    }
5642                };
5643
5644                // When this item gets dropped it will also drop its progress source, so we need to
5645                // check the users of those.
5646                if let Some(id) = item.progress_id() {
5647                    let progress_item = scx.catalog.get_item(&id);
5648                    check_if_dependents_exist(progress_item.used_by())?;
5649                }
5650                check_if_dependents_exist(item.used_by())?;
5651            }
5652            drop_ids.push(item.id().into());
5653        }
5654        update_privilege_revokes(
5655            SystemObjectId::Object(item.id().into()),
5656            item.privileges(),
5657            &role_ids,
5658            &mut privilege_revokes,
5659        );
5660    }
5661
5662    // Schemas
5663    for schema in scx.catalog.get_schemas() {
5664        if !schema.id().is_temporary() {
5665            if role_ids.contains(&schema.owner_id()) {
5666                if !cascade {
5667                    let non_owned_dependencies: Vec<_> = schema
5668                        .item_ids()
5669                        .map(|item_id| scx.catalog.get_item(&item_id))
5670                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5671                        .filter(|item| !role_ids.contains(&item.owner_id()))
5672                        .collect();
5673                    if !non_owned_dependencies.is_empty() {
5674                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5675                        sql_bail!(
5676                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5677                            full_schema_name.to_string().quoted()
5678                        );
5679                    }
5680                }
5681                drop_ids.push((*schema.database(), *schema.id()).into())
5682            }
5683            update_privilege_revokes(
5684                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5685                schema.privileges(),
5686                &role_ids,
5687                &mut privilege_revokes,
5688            );
5689        }
5690    }
5691
5692    // Databases
5693    for database in scx.catalog.get_databases() {
5694        if role_ids.contains(&database.owner_id()) {
5695            if !cascade {
5696                let non_owned_schemas: Vec<_> = database
5697                    .schemas()
5698                    .into_iter()
5699                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5700                    .collect();
5701                if !non_owned_schemas.is_empty() {
5702                    sql_bail!(
5703                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5704                        database.name().quoted(),
5705                    );
5706                }
5707            }
5708            drop_ids.push(database.id().into());
5709        }
5710        update_privilege_revokes(
5711            SystemObjectId::Object(database.id().into()),
5712            database.privileges(),
5713            &role_ids,
5714            &mut privilege_revokes,
5715        );
5716    }
5717
5718    // System
5719    update_privilege_revokes(
5720        SystemObjectId::System,
5721        scx.catalog.get_system_privileges(),
5722        &role_ids,
5723        &mut privilege_revokes,
5724    );
5725
5726    for (default_privilege_object, default_privilege_acl_items) in
5727        scx.catalog.get_default_privileges()
5728    {
5729        for default_privilege_acl_item in default_privilege_acl_items {
5730            if role_ids.contains(&default_privilege_object.role_id)
5731                || role_ids.contains(&default_privilege_acl_item.grantee)
5732            {
5733                default_privilege_revokes.push((
5734                    default_privilege_object.clone(),
5735                    default_privilege_acl_item.clone(),
5736                ));
5737            }
5738        }
5739    }
5740
5741    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5742
5743    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5744    if !system_ids.is_empty() {
5745        let mut owners = system_ids
5746            .into_iter()
5747            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5748            .collect::<BTreeSet<_>>()
5749            .into_iter()
5750            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5751        sql_bail!(
5752            "cannot drop objects owned by role {} because they are required by the database system",
5753            owners.join(", "),
5754        );
5755    }
5756
5757    Ok(Plan::DropOwned(DropOwnedPlan {
5758        role_ids: role_ids.into_iter().collect(),
5759        drop_ids,
5760        privilege_revokes,
5761        default_privilege_revokes,
5762    }))
5763}
5764
5765fn plan_retain_history_option(
5766    scx: &StatementContext,
5767    retain_history: Option<OptionalDuration>,
5768) -> Result<Option<CompactionWindow>, PlanError> {
5769    if let Some(OptionalDuration(lcw)) = retain_history {
5770        Ok(Some(plan_retain_history(scx, lcw)?))
5771    } else {
5772        Ok(None)
5773    }
5774}
5775
5776// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5777// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5778// already converts the zero duration into `None`. This function must not be called in the `RESET
5779// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5780// `None`.
5781fn plan_retain_history(
5782    scx: &StatementContext,
5783    lcw: Option<Duration>,
5784) -> Result<CompactionWindow, PlanError> {
5785    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5786    match lcw {
5787        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5788        // disable compaction), and should never occur here. Furthermore, some things actually do
5789        // break when this is set to real zero:
5790        // https://github.com/MaterializeInc/database-issues/issues/3798.
5791        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5792            option_name: "RETAIN HISTORY".to_string(),
5793            err: Box::new(PlanError::Unstructured(
5794                "internal error: unexpectedly zero".to_string(),
5795            )),
5796        }),
5797        Some(duration) => {
5798            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5799            // should only be possible during testing).
5800            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5801                && scx
5802                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5803                    .is_err()
5804            {
5805                return Err(PlanError::RetainHistoryLow {
5806                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5807                });
5808            }
5809            Ok(duration.try_into()?)
5810        }
5811        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5812        // to be a bad choice, so prevent it.
5813        None => {
5814            if scx
5815                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5816                .is_err()
5817            {
5818                Err(PlanError::RetainHistoryRequired)
5819            } else {
5820                Ok(CompactionWindow::DisableCompaction)
5821            }
5822        }
5823    }
5824}
5825
5826generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5827
5828fn plan_index_options(
5829    scx: &StatementContext,
5830    with_opts: Vec<IndexOption<Aug>>,
5831) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5832    if !with_opts.is_empty() {
5833        // Index options are not durable.
5834        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5835    }
5836
5837    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5838
5839    let mut out = Vec::with_capacity(1);
5840    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5841        out.push(crate::plan::IndexOption::RetainHistory(cw));
5842    }
5843    Ok(out)
5844}
5845
5846generate_extracted_config!(
5847    TableOption,
5848    (PartitionBy, Vec<Ident>),
5849    (RetainHistory, OptionalDuration),
5850    (RedactedTest, String)
5851);
5852
5853fn plan_table_options(
5854    scx: &StatementContext,
5855    desc: &RelationDesc,
5856    with_opts: Vec<TableOption<Aug>>,
5857) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5858    let TableOptionExtracted {
5859        partition_by,
5860        retain_history,
5861        redacted_test,
5862        ..
5863    }: TableOptionExtracted = with_opts.try_into()?;
5864
5865    if let Some(partition_by) = partition_by {
5866        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5867        check_partition_by(desc, partition_by)?;
5868    }
5869
5870    if redacted_test.is_some() {
5871        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5872    }
5873
5874    let mut out = Vec::with_capacity(1);
5875    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5876        out.push(crate::plan::TableOption::RetainHistory(cw));
5877    }
5878    Ok(out)
5879}
5880
5881pub fn plan_alter_index_options(
5882    scx: &mut StatementContext,
5883    AlterIndexStatement {
5884        index_name,
5885        if_exists,
5886        action,
5887    }: AlterIndexStatement<Aug>,
5888) -> Result<Plan, PlanError> {
5889    let object_type = ObjectType::Index;
5890    match action {
5891        AlterIndexAction::ResetOptions(options) => {
5892            let mut options = options.into_iter();
5893            if let Some(opt) = options.next() {
5894                match opt {
5895                    IndexOptionName::RetainHistory => {
5896                        if options.next().is_some() {
5897                            sql_bail!("RETAIN HISTORY must be only option");
5898                        }
5899                        return alter_retain_history(
5900                            scx,
5901                            object_type,
5902                            if_exists,
5903                            UnresolvedObjectName::Item(index_name),
5904                            None,
5905                        );
5906                    }
5907                }
5908            }
5909            sql_bail!("expected option");
5910        }
5911        AlterIndexAction::SetOptions(options) => {
5912            let mut options = options.into_iter();
5913            if let Some(opt) = options.next() {
5914                match opt.name {
5915                    IndexOptionName::RetainHistory => {
5916                        if options.next().is_some() {
5917                            sql_bail!("RETAIN HISTORY must be only option");
5918                        }
5919                        return alter_retain_history(
5920                            scx,
5921                            object_type,
5922                            if_exists,
5923                            UnresolvedObjectName::Item(index_name),
5924                            opt.value,
5925                        );
5926                    }
5927                }
5928            }
5929            sql_bail!("expected option");
5930        }
5931    }
5932}
5933
5934pub fn describe_alter_cluster_set_options(
5935    _: &StatementContext,
5936    _: AlterClusterStatement<Aug>,
5937) -> Result<StatementDesc, PlanError> {
5938    Ok(StatementDesc::new(None))
5939}
5940
5941pub fn plan_alter_cluster(
5942    scx: &mut StatementContext,
5943    AlterClusterStatement {
5944        name,
5945        action,
5946        if_exists,
5947    }: AlterClusterStatement<Aug>,
5948) -> Result<Plan, PlanError> {
5949    let cluster = match resolve_cluster(scx, &name, if_exists)? {
5950        Some(entry) => entry,
5951        None => {
5952            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5953                name: name.to_ast_string_simple(),
5954                object_type: ObjectType::Cluster,
5955            });
5956
5957            return Ok(Plan::AlterNoop(AlterNoopPlan {
5958                object_type: ObjectType::Cluster,
5959            }));
5960        }
5961    };
5962
5963    let mut options: PlanClusterOption = Default::default();
5964    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5965
5966    match action {
5967        AlterClusterAction::SetOptions {
5968            options: set_options,
5969            with_options,
5970        } => {
5971            let ClusterOptionExtracted {
5972                availability_zones,
5973                introspection_debugging,
5974                introspection_interval,
5975                managed,
5976                replicas: replica_defs,
5977                replication_factor,
5978                seen: _,
5979                size,
5980                disk,
5981                schedule,
5982                workload_class,
5983            }: ClusterOptionExtracted = set_options.try_into()?;
5984
5985            if !scx.catalog.active_role_id().is_system() {
5986                if workload_class.is_some() {
5987                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
5988                }
5989            }
5990
5991            match managed.unwrap_or_else(|| cluster.is_managed()) {
5992                true => {
5993                    let alter_strategy_extracted =
5994                        ClusterAlterOptionExtracted::try_from(with_options)?;
5995                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
5996
5997                    match alter_strategy {
5998                        AlterClusterPlanStrategy::None => {}
5999                        _ => {
6000                            scx.require_feature_flag(
6001                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6002                            )?;
6003                        }
6004                    }
6005
6006                    if replica_defs.is_some() {
6007                        sql_bail!("REPLICAS not supported for managed clusters");
6008                    }
6009                    if schedule.is_some()
6010                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6011                    {
6012                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6013                    }
6014
6015                    if let Some(replication_factor) = replication_factor {
6016                        if schedule.is_some()
6017                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6018                        {
6019                            sql_bail!(
6020                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6021                            );
6022                        }
6023                        if let Some(current_schedule) = cluster.schedule() {
6024                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6025                                sql_bail!(
6026                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6027                                );
6028                            }
6029                        }
6030
6031                        let internal_replica_count =
6032                            cluster.replicas().iter().filter(|r| r.internal()).count();
6033                        let hypothetical_replica_count =
6034                            internal_replica_count + usize::cast_from(replication_factor);
6035
6036                        // Total number of replicas running is internal replicas
6037                        // + replication factor.
6038                        if contains_single_replica_objects(scx, cluster)
6039                            && hypothetical_replica_count > 1
6040                        {
6041                            return Err(PlanError::CreateReplicaFailStorageObjects {
6042                                current_replica_count: cluster.replica_ids().iter().count(),
6043                                internal_replica_count,
6044                                hypothetical_replica_count,
6045                            });
6046                        }
6047                    } else if alter_strategy.is_some() {
6048                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6049                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6050                        // just fail.
6051                        let internal_replica_count =
6052                            cluster.replicas().iter().filter(|r| r.internal()).count();
6053                        let hypothetical_replica_count = internal_replica_count * 2;
6054                        if contains_single_replica_objects(scx, cluster) {
6055                            return Err(PlanError::CreateReplicaFailStorageObjects {
6056                                current_replica_count: cluster.replica_ids().iter().count(),
6057                                internal_replica_count,
6058                                hypothetical_replica_count,
6059                            });
6060                        }
6061                    }
6062                }
6063                false => {
6064                    if !alter_strategy.is_none() {
6065                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6066                    }
6067                    if availability_zones.is_some() {
6068                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6069                    }
6070                    if replication_factor.is_some() {
6071                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6072                    }
6073                    if introspection_debugging.is_some() {
6074                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6075                    }
6076                    if introspection_interval.is_some() {
6077                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6078                    }
6079                    if size.is_some() {
6080                        sql_bail!("SIZE not supported for unmanaged clusters");
6081                    }
6082                    if disk.is_some() {
6083                        sql_bail!("DISK not supported for unmanaged clusters");
6084                    }
6085                    if schedule.is_some()
6086                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6087                    {
6088                        sql_bail!(
6089                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6090                        );
6091                    }
6092                    if let Some(current_schedule) = cluster.schedule() {
6093                        if !matches!(current_schedule, ClusterSchedule::Manual)
6094                            && schedule.is_none()
6095                        {
6096                            sql_bail!(
6097                                "when switching a cluster to unmanaged, if the managed \
6098                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6099                                explicitly set the SCHEDULE to MANUAL"
6100                            );
6101                        }
6102                    }
6103                }
6104            }
6105
6106            let mut replicas = vec![];
6107            for ReplicaDefinition { name, options } in
6108                replica_defs.into_iter().flat_map(Vec::into_iter)
6109            {
6110                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6111            }
6112
6113            if let Some(managed) = managed {
6114                options.managed = AlterOptionParameter::Set(managed);
6115            }
6116            if let Some(replication_factor) = replication_factor {
6117                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6118            }
6119            if let Some(size) = &size {
6120                // HACK(benesch): disk is always enabled for v2 cluster sizes,
6121                // and it is an error to specify `DISK = FALSE` or `DISK = TRUE`
6122                // explicitly.
6123                //
6124                // The long term plan is to phase out the v1 cluster sizes, at
6125                // which point we'll be able to remove the `DISK` option
6126                // entirely and simply always enable disk.
6127                if scx.catalog.is_cluster_size_cc(size) {
6128                    if disk.is_some() {
6129                        sql_bail!(
6130                            "DISK option not supported for modern cluster sizes because disk is always enabled"
6131                        );
6132                    } else {
6133                        options.disk = AlterOptionParameter::Set(true);
6134                    }
6135                }
6136                options.size = AlterOptionParameter::Set(size.clone());
6137            }
6138            if let Some(availability_zones) = availability_zones {
6139                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6140            }
6141            if let Some(introspection_debugging) = introspection_debugging {
6142                options.introspection_debugging =
6143                    AlterOptionParameter::Set(introspection_debugging);
6144            }
6145            if let Some(introspection_interval) = introspection_interval {
6146                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6147            }
6148            if let Some(disk) = disk {
6149                // HACK(benesch): disk is always enabled for v2 cluster sizes,
6150                // and it is an error to specify `DISK = FALSE` or `DISK = TRUE`
6151                // explicitly.
6152                //
6153                // The long term plan is to phase out the v1 cluster sizes, at
6154                // which point we'll be able to remove the `DISK` option
6155                // entirely and simply always enable disk.
6156                let size = size.as_deref().unwrap_or_else(|| {
6157                    cluster.managed_size().expect("cluster known to be managed")
6158                });
6159                if scx.catalog.is_cluster_size_cc(size) {
6160                    sql_bail!(
6161                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6162                    );
6163                }
6164
6165                if disk {
6166                    scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
6167                }
6168                options.disk = AlterOptionParameter::Set(disk);
6169            }
6170            if !replicas.is_empty() {
6171                options.replicas = AlterOptionParameter::Set(replicas);
6172            }
6173            if let Some(schedule) = schedule {
6174                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6175            }
6176            if let Some(workload_class) = workload_class {
6177                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6178            }
6179        }
6180        AlterClusterAction::ResetOptions(reset_options) => {
6181            use AlterOptionParameter::Reset;
6182            use ClusterOptionName::*;
6183
6184            if !scx.catalog.active_role_id().is_system() {
6185                if reset_options.contains(&WorkloadClass) {
6186                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6187                }
6188            }
6189
6190            for option in reset_options {
6191                match option {
6192                    AvailabilityZones => options.availability_zones = Reset,
6193                    Disk => options.disk = Reset,
6194                    IntrospectionInterval => options.introspection_interval = Reset,
6195                    IntrospectionDebugging => options.introspection_debugging = Reset,
6196                    Managed => options.managed = Reset,
6197                    Replicas => options.replicas = Reset,
6198                    ReplicationFactor => options.replication_factor = Reset,
6199                    Size => options.size = Reset,
6200                    Schedule => options.schedule = Reset,
6201                    WorkloadClass => options.workload_class = Reset,
6202                }
6203            }
6204        }
6205    }
6206    Ok(Plan::AlterCluster(AlterClusterPlan {
6207        id: cluster.id(),
6208        name: cluster.name().to_string(),
6209        options,
6210        strategy: alter_strategy,
6211    }))
6212}
6213
6214pub fn describe_alter_set_cluster(
6215    _: &StatementContext,
6216    _: AlterSetClusterStatement<Aug>,
6217) -> Result<StatementDesc, PlanError> {
6218    Ok(StatementDesc::new(None))
6219}
6220
6221pub fn plan_alter_item_set_cluster(
6222    scx: &StatementContext,
6223    AlterSetClusterStatement {
6224        if_exists,
6225        set_cluster: in_cluster_name,
6226        name,
6227        object_type,
6228    }: AlterSetClusterStatement<Aug>,
6229) -> Result<Plan, PlanError> {
6230    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6231
6232    let object_type = object_type.into();
6233
6234    // Prevent access to `SET CLUSTER` for unsupported objects.
6235    match object_type {
6236        ObjectType::MaterializedView => {}
6237        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6238            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6239        }
6240        _ => {
6241            bail_never_supported!(
6242                format!("ALTER {object_type} SET CLUSTER"),
6243                "sql/alter-set-cluster/",
6244                format!("{object_type} has no associated cluster")
6245            )
6246        }
6247    }
6248
6249    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6250
6251    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6252        Some(entry) => {
6253            let current_cluster = entry.cluster_id();
6254            let Some(current_cluster) = current_cluster else {
6255                sql_bail!("No cluster associated with {name}");
6256            };
6257
6258            if current_cluster == in_cluster.id() {
6259                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6260            } else {
6261                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6262                    id: entry.id(),
6263                    set_cluster: in_cluster.id(),
6264                }))
6265            }
6266        }
6267        None => {
6268            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6269                name: name.to_ast_string_simple(),
6270                object_type,
6271            });
6272
6273            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6274        }
6275    }
6276}
6277
6278pub fn describe_alter_object_rename(
6279    _: &StatementContext,
6280    _: AlterObjectRenameStatement,
6281) -> Result<StatementDesc, PlanError> {
6282    Ok(StatementDesc::new(None))
6283}
6284
6285pub fn plan_alter_object_rename(
6286    scx: &mut StatementContext,
6287    AlterObjectRenameStatement {
6288        name,
6289        object_type,
6290        to_item_name,
6291        if_exists,
6292    }: AlterObjectRenameStatement,
6293) -> Result<Plan, PlanError> {
6294    let object_type = object_type.into();
6295    match (object_type, name) {
6296        (
6297            ObjectType::View
6298            | ObjectType::MaterializedView
6299            | ObjectType::Table
6300            | ObjectType::Source
6301            | ObjectType::Index
6302            | ObjectType::Sink
6303            | ObjectType::Secret
6304            | ObjectType::Connection,
6305            UnresolvedObjectName::Item(name),
6306        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6307        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6308            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6309        }
6310        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6311            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6312        }
6313        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6314            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6315        }
6316        (object_type, name) => {
6317            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6318        }
6319    }
6320}
6321
6322pub fn plan_alter_schema_rename(
6323    scx: &mut StatementContext,
6324    name: UnresolvedSchemaName,
6325    to_schema_name: Ident,
6326    if_exists: bool,
6327) -> Result<Plan, PlanError> {
6328    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6329        let object_type = ObjectType::Schema;
6330        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6331            name: name.to_ast_string_simple(),
6332            object_type,
6333        });
6334        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6335    };
6336
6337    // Make sure the name is unique.
6338    if scx
6339        .resolve_schema_in_database(&db_spec, &to_schema_name)
6340        .is_ok()
6341    {
6342        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6343            to_schema_name.clone().into_string(),
6344        )));
6345    }
6346
6347    // Prevent users from renaming system related schemas.
6348    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6349    if schema.id().is_system() {
6350        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6351    }
6352
6353    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6354        cur_schema_spec: (db_spec, schema_spec),
6355        new_schema_name: to_schema_name.into_string(),
6356    }))
6357}
6358
6359pub fn plan_alter_schema_swap<F>(
6360    scx: &mut StatementContext,
6361    name_a: UnresolvedSchemaName,
6362    name_b: Ident,
6363    gen_temp_suffix: F,
6364) -> Result<Plan, PlanError>
6365where
6366    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6367{
6368    let schema_a = scx.resolve_schema(name_a.clone())?;
6369
6370    let db_spec = schema_a.database().clone();
6371    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6372        sql_bail!("cannot swap schemas that are in the ambient database");
6373    };
6374    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6375
6376    // We cannot swap system schemas.
6377    if schema_a.id().is_system() || schema_b.id().is_system() {
6378        bail_never_supported!("swapping a system schema".to_string())
6379    }
6380
6381    // Generate a temporary name we can swap schema_a to.
6382    //
6383    // 'check' returns if the temp schema name would be valid.
6384    let check = |temp_suffix: &str| {
6385        let mut temp_name = ident!("mz_schema_swap_");
6386        temp_name.append_lossy(temp_suffix);
6387        scx.resolve_schema_in_database(&db_spec, &temp_name)
6388            .is_err()
6389    };
6390    let temp_suffix = gen_temp_suffix(&check)?;
6391    let name_temp = format!("mz_schema_swap_{temp_suffix}");
6392
6393    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6394        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6395        schema_a_name: schema_a.name().schema.to_string(),
6396        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6397        schema_b_name: schema_b.name().schema.to_string(),
6398        name_temp,
6399    }))
6400}
6401
6402pub fn plan_alter_item_rename(
6403    scx: &mut StatementContext,
6404    object_type: ObjectType,
6405    name: UnresolvedItemName,
6406    to_item_name: Ident,
6407    if_exists: bool,
6408) -> Result<Plan, PlanError> {
6409    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6410        Ok(r) => r,
6411        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6412        Err(PlanError::MismatchedObjectType {
6413            name,
6414            is_type: ObjectType::MaterializedView,
6415            expected_type: ObjectType::View,
6416        }) => {
6417            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6418        }
6419        e => e?,
6420    };
6421
6422    match resolved {
6423        Some(entry) => {
6424            let full_name = scx.catalog.resolve_full_name(entry.name());
6425            let item_type = entry.item_type();
6426
6427            let proposed_name = QualifiedItemName {
6428                qualifiers: entry.name().qualifiers.clone(),
6429                item: to_item_name.clone().into_string(),
6430            };
6431
6432            // For PostgreSQL compatibility, items and types cannot have
6433            // overlapping names in a variety of situations. See the comment on
6434            // `CatalogItemType::conflicts_with_type` for details.
6435            let conflicting_type_exists;
6436            let conflicting_item_exists;
6437            if item_type == CatalogItemType::Type {
6438                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6439                conflicting_item_exists = scx
6440                    .catalog
6441                    .get_item_by_name(&proposed_name)
6442                    .map(|item| item.item_type().conflicts_with_type())
6443                    .unwrap_or(false);
6444            } else {
6445                conflicting_type_exists = item_type.conflicts_with_type()
6446                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6447                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6448            };
6449            if conflicting_type_exists || conflicting_item_exists {
6450                sql_bail!("catalog item '{}' already exists", to_item_name);
6451            }
6452
6453            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6454                id: entry.id(),
6455                current_full_name: full_name,
6456                to_name: normalize::ident(to_item_name),
6457                object_type,
6458            }))
6459        }
6460        None => {
6461            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6462                name: name.to_ast_string_simple(),
6463                object_type,
6464            });
6465
6466            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6467        }
6468    }
6469}
6470
6471pub fn plan_alter_cluster_rename(
6472    scx: &mut StatementContext,
6473    object_type: ObjectType,
6474    name: Ident,
6475    to_name: Ident,
6476    if_exists: bool,
6477) -> Result<Plan, PlanError> {
6478    match resolve_cluster(scx, &name, if_exists)? {
6479        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6480            id: entry.id(),
6481            name: entry.name().to_string(),
6482            to_name: ident(to_name),
6483        })),
6484        None => {
6485            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6486                name: name.to_ast_string_simple(),
6487                object_type,
6488            });
6489
6490            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6491        }
6492    }
6493}
6494
6495pub fn plan_alter_cluster_swap<F>(
6496    scx: &mut StatementContext,
6497    name_a: Ident,
6498    name_b: Ident,
6499    gen_temp_suffix: F,
6500) -> Result<Plan, PlanError>
6501where
6502    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6503{
6504    let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6505    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6506
6507    let check = |temp_suffix: &str| {
6508        let mut temp_name = ident!("mz_schema_swap_");
6509        temp_name.append_lossy(temp_suffix);
6510        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6511            // Temp name does not exist, so we can use it.
6512            Err(CatalogError::UnknownCluster(_)) => true,
6513            // Temp name already exists!
6514            Ok(_) | Err(_) => false,
6515        }
6516    };
6517    let temp_suffix = gen_temp_suffix(&check)?;
6518    let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6519
6520    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6521        id_a: cluster_a.id(),
6522        id_b: cluster_b.id(),
6523        name_a: name_a.into_string(),
6524        name_b: name_b.into_string(),
6525        name_temp,
6526    }))
6527}
6528
6529pub fn plan_alter_cluster_replica_rename(
6530    scx: &mut StatementContext,
6531    object_type: ObjectType,
6532    name: QualifiedReplica,
6533    to_item_name: Ident,
6534    if_exists: bool,
6535) -> Result<Plan, PlanError> {
6536    match resolve_cluster_replica(scx, &name, if_exists)? {
6537        Some((cluster, replica)) => {
6538            ensure_cluster_is_not_managed(scx, cluster.id())?;
6539            Ok(Plan::AlterClusterReplicaRename(
6540                AlterClusterReplicaRenamePlan {
6541                    cluster_id: cluster.id(),
6542                    replica_id: replica,
6543                    name: QualifiedReplica {
6544                        cluster: Ident::new(cluster.name())?,
6545                        replica: name.replica,
6546                    },
6547                    to_name: normalize::ident(to_item_name),
6548                },
6549            ))
6550        }
6551        None => {
6552            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6553                name: name.to_ast_string_simple(),
6554                object_type,
6555            });
6556
6557            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6558        }
6559    }
6560}
6561
6562pub fn describe_alter_object_swap(
6563    _: &StatementContext,
6564    _: AlterObjectSwapStatement,
6565) -> Result<StatementDesc, PlanError> {
6566    Ok(StatementDesc::new(None))
6567}
6568
6569pub fn plan_alter_object_swap(
6570    scx: &mut StatementContext,
6571    stmt: AlterObjectSwapStatement,
6572) -> Result<Plan, PlanError> {
6573    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6574
6575    let AlterObjectSwapStatement {
6576        object_type,
6577        name_a,
6578        name_b,
6579    } = stmt;
6580    let object_type = object_type.into();
6581
6582    // We'll try 10 times to generate a temporary suffix.
6583    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6584        let mut attempts = 0;
6585        let name_temp = loop {
6586            attempts += 1;
6587            if attempts > 10 {
6588                tracing::warn!("Unable to generate temp id for swapping");
6589                sql_bail!("unable to swap!");
6590            }
6591
6592            // Call the provided closure to make sure this name is unique!
6593            let short_id = mz_ore::id_gen::temp_id();
6594            if check_fn(&short_id) {
6595                break short_id;
6596            }
6597        };
6598
6599        Ok(name_temp)
6600    };
6601
6602    match (object_type, name_a, name_b) {
6603        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6604            plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6605        }
6606        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6607            plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6608        }
6609        (object_type, _, _) => Err(PlanError::Unsupported {
6610            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6611            discussion_no: None,
6612        }),
6613    }
6614}
6615
6616pub fn describe_alter_retain_history(
6617    _: &StatementContext,
6618    _: AlterRetainHistoryStatement<Aug>,
6619) -> Result<StatementDesc, PlanError> {
6620    Ok(StatementDesc::new(None))
6621}
6622
6623pub fn plan_alter_retain_history(
6624    scx: &StatementContext,
6625    AlterRetainHistoryStatement {
6626        object_type,
6627        if_exists,
6628        name,
6629        history,
6630    }: AlterRetainHistoryStatement<Aug>,
6631) -> Result<Plan, PlanError> {
6632    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6633}
6634
6635fn alter_retain_history(
6636    scx: &StatementContext,
6637    object_type: ObjectType,
6638    if_exists: bool,
6639    name: UnresolvedObjectName,
6640    history: Option<WithOptionValue<Aug>>,
6641) -> Result<Plan, PlanError> {
6642    let name = match (object_type, name) {
6643        (
6644            // View gets a special error below.
6645            ObjectType::View
6646            | ObjectType::MaterializedView
6647            | ObjectType::Table
6648            | ObjectType::Source
6649            | ObjectType::Index,
6650            UnresolvedObjectName::Item(name),
6651        ) => name,
6652        (object_type, _) => {
6653            sql_bail!("{object_type} does not support RETAIN HISTORY")
6654        }
6655    };
6656    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6657        Some(entry) => {
6658            let full_name = scx.catalog.resolve_full_name(entry.name());
6659            let item_type = entry.item_type();
6660
6661            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6662            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6663                return Err(PlanError::AlterViewOnMaterializedView(
6664                    full_name.to_string(),
6665                ));
6666            } else if object_type == ObjectType::View {
6667                sql_bail!("{object_type} does not support RETAIN HISTORY")
6668            } else if object_type != item_type {
6669                sql_bail!(
6670                    "\"{}\" is a {} not a {}",
6671                    full_name,
6672                    entry.item_type(),
6673                    format!("{object_type}").to_lowercase()
6674                )
6675            }
6676
6677            // Save the original value so we can write it back down in the create_sql catalog item.
6678            let (value, lcw) = match &history {
6679                Some(WithOptionValue::RetainHistoryFor(value)) => {
6680                    let window = OptionalDuration::try_from_value(value.clone())?;
6681                    (Some(value.clone()), window.0)
6682                }
6683                // None is RESET, so use the default CW.
6684                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6685                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6686            };
6687            let window = plan_retain_history(scx, lcw)?;
6688
6689            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6690                id: entry.id(),
6691                value,
6692                window,
6693                object_type,
6694            }))
6695        }
6696        None => {
6697            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6698                name: name.to_ast_string_simple(),
6699                object_type,
6700            });
6701
6702            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6703        }
6704    }
6705}
6706
6707pub fn describe_alter_secret_options(
6708    _: &StatementContext,
6709    _: AlterSecretStatement<Aug>,
6710) -> Result<StatementDesc, PlanError> {
6711    Ok(StatementDesc::new(None))
6712}
6713
6714pub fn plan_alter_secret(
6715    scx: &mut StatementContext,
6716    stmt: AlterSecretStatement<Aug>,
6717) -> Result<Plan, PlanError> {
6718    let AlterSecretStatement {
6719        name,
6720        if_exists,
6721        value,
6722    } = stmt;
6723    let object_type = ObjectType::Secret;
6724    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6725        Some(entry) => entry.id(),
6726        None => {
6727            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6728                name: name.to_string(),
6729                object_type,
6730            });
6731
6732            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6733        }
6734    };
6735
6736    let secret_as = query::plan_secret_as(scx, value)?;
6737
6738    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6739}
6740
6741pub fn describe_alter_connection(
6742    _: &StatementContext,
6743    _: AlterConnectionStatement<Aug>,
6744) -> Result<StatementDesc, PlanError> {
6745    Ok(StatementDesc::new(None))
6746}
6747
6748generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6749
6750pub fn plan_alter_connection(
6751    scx: &StatementContext,
6752    stmt: AlterConnectionStatement<Aug>,
6753) -> Result<Plan, PlanError> {
6754    let AlterConnectionStatement {
6755        name,
6756        if_exists,
6757        actions,
6758        with_options,
6759    } = stmt;
6760    let conn_name = normalize::unresolved_item_name(name)?;
6761    let entry = match scx.catalog.resolve_item(&conn_name) {
6762        Ok(entry) => entry,
6763        Err(_) if if_exists => {
6764            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6765                name: conn_name.to_string(),
6766                object_type: ObjectType::Sink,
6767            });
6768
6769            return Ok(Plan::AlterNoop(AlterNoopPlan {
6770                object_type: ObjectType::Connection,
6771            }));
6772        }
6773        Err(e) => return Err(e.into()),
6774    };
6775
6776    let connection = entry.connection()?;
6777
6778    if actions
6779        .iter()
6780        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6781    {
6782        if actions.len() > 1 {
6783            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6784        }
6785
6786        if !with_options.is_empty() {
6787            sql_bail!(
6788                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6789                with_options
6790                    .iter()
6791                    .map(|o| o.to_ast_string_simple())
6792                    .join(", ")
6793            );
6794        }
6795
6796        if !matches!(connection, Connection::Ssh(_)) {
6797            sql_bail!(
6798                "{} is not an SSH connection",
6799                scx.catalog.resolve_full_name(entry.name())
6800            )
6801        }
6802
6803        return Ok(Plan::AlterConnection(AlterConnectionPlan {
6804            id: entry.id(),
6805            action: crate::plan::AlterConnectionAction::RotateKeys,
6806        }));
6807    }
6808
6809    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6810    if options.validate.is_some() {
6811        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6812    }
6813
6814    let validate = match options.validate {
6815        Some(val) => val,
6816        None => {
6817            scx.catalog
6818                .system_vars()
6819                .enable_default_connection_validation()
6820                && connection.validate_by_default()
6821        }
6822    };
6823
6824    let connection_type = match connection {
6825        Connection::Aws(_) => CreateConnectionType::Aws,
6826        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6827        Connection::Kafka(_) => CreateConnectionType::Kafka,
6828        Connection::Csr(_) => CreateConnectionType::Csr,
6829        Connection::Postgres(_) => CreateConnectionType::Postgres,
6830        Connection::Ssh(_) => CreateConnectionType::Ssh,
6831        Connection::MySql(_) => CreateConnectionType::MySql,
6832        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6833    };
6834
6835    // Collect all options irrespective of action taken on them.
6836    let specified_options: BTreeSet<_> = actions
6837        .iter()
6838        .map(|action: &AlterConnectionAction<Aug>| match action {
6839            AlterConnectionAction::SetOption(option) => option.name.clone(),
6840            AlterConnectionAction::DropOption(name) => name.clone(),
6841            AlterConnectionAction::RotateKeys => unreachable!(),
6842        })
6843        .collect();
6844
6845    for invalid in INALTERABLE_OPTIONS {
6846        if specified_options.contains(invalid) {
6847            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6848        }
6849    }
6850
6851    connection::validate_options_per_connection_type(connection_type, specified_options)?;
6852
6853    // Partition operations into set and drop
6854    let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6855        actions.into_iter().partition_map(|action| match action {
6856            AlterConnectionAction::SetOption(option) => Either::Left(option),
6857            AlterConnectionAction::DropOption(name) => Either::Right(name),
6858            AlterConnectionAction::RotateKeys => unreachable!(),
6859        });
6860
6861    let set_options: BTreeMap<_, _> = set_options_vec
6862        .clone()
6863        .into_iter()
6864        .map(|option| (option.name, option.value))
6865        .collect();
6866
6867    // Type check values + avoid duplicates; we don't want to e.g. let users
6868    // drop and set the same option in the same statement, so treating drops as
6869    // sets here is fine.
6870    let connection_options_extracted =
6871        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6872
6873    let duplicates: Vec<_> = connection_options_extracted
6874        .seen
6875        .intersection(&drop_options)
6876        .collect();
6877
6878    if !duplicates.is_empty() {
6879        sql_bail!(
6880            "cannot both SET and DROP/RESET options {}",
6881            duplicates
6882                .iter()
6883                .map(|option| option.to_string())
6884                .join(", ")
6885        )
6886    }
6887
6888    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6889        let set_options_count = mutually_exclusive_options
6890            .iter()
6891            .filter(|o| set_options.contains_key(o))
6892            .count();
6893        let drop_options_count = mutually_exclusive_options
6894            .iter()
6895            .filter(|o| drop_options.contains(o))
6896            .count();
6897
6898        // Disallow setting _and_ resetting mutually exclusive options
6899        if set_options_count > 0 && drop_options_count > 0 {
6900            sql_bail!(
6901                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6902                connection_type,
6903                mutually_exclusive_options
6904                    .iter()
6905                    .map(|option| option.to_string())
6906                    .join(", ")
6907            )
6908        }
6909
6910        // If any option is either set or dropped, ensure all mutually exclusive
6911        // options are dropped. We do this "behind the scenes", even though we
6912        // disallow users from performing the same action because this is the
6913        // mechanism by which we overwrite values elsewhere in the code.
6914        if set_options_count > 0 || drop_options_count > 0 {
6915            drop_options.extend(mutually_exclusive_options.iter().cloned());
6916        }
6917
6918        // n.b. if mutually exclusive options are set, those will error when we
6919        // try to replan the connection.
6920    }
6921
6922    Ok(Plan::AlterConnection(AlterConnectionPlan {
6923        id: entry.id(),
6924        action: crate::plan::AlterConnectionAction::AlterOptions {
6925            set_options,
6926            drop_options,
6927            validate,
6928        },
6929    }))
6930}
6931
6932pub fn describe_alter_sink(
6933    _: &StatementContext,
6934    _: AlterSinkStatement<Aug>,
6935) -> Result<StatementDesc, PlanError> {
6936    Ok(StatementDesc::new(None))
6937}
6938
6939pub fn plan_alter_sink(
6940    scx: &mut StatementContext,
6941    stmt: AlterSinkStatement<Aug>,
6942) -> Result<Plan, PlanError> {
6943    let AlterSinkStatement {
6944        sink_name,
6945        if_exists,
6946        action,
6947    } = stmt;
6948
6949    let object_type = ObjectType::Sink;
6950    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6951
6952    let Some(item) = item else {
6953        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6954            name: sink_name.to_string(),
6955            object_type,
6956        });
6957
6958        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6959    };
6960    // Always ALTER objects from their latest version.
6961    let item = item.at_version(RelationVersionSelector::Latest);
6962
6963    match action {
6964        AlterSinkAction::ChangeRelation(new_from) => {
6965            // First we reconstruct the original CREATE SINK statement
6966            let create_sql = item.create_sql();
6967            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6968            let [stmt]: [StatementParseResult; 1] = stmts
6969                .try_into()
6970                .expect("create sql of sink was not exactly one statement");
6971            let Statement::CreateSink(mut stmt) = stmt.ast else {
6972                unreachable!("invalid create SQL for sink item");
6973            };
6974
6975            // And then we find the existing version of the sink and increase it by one
6976            let cur_version = stmt
6977                .with_options
6978                .drain_filter_swapping(|o| o.name == CreateSinkOptionName::Version)
6979                .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
6980                .max()
6981                .unwrap_or(0);
6982            let new_version = cur_version + 1;
6983            stmt.with_options.push(CreateSinkOption {
6984                name: CreateSinkOptionName::Version,
6985                value: Some(WithOptionValue::Value(Value::Number(
6986                    new_version.to_string(),
6987                ))),
6988            });
6989
6990            // Then resolve and swap the resolved from relation to the new one
6991            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
6992            stmt.from = new_from;
6993
6994            // Finally re-plan the modified create sink statement to verify the new configuration is valid
6995            let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
6996                unreachable!("invalid plan for CREATE SINK statement");
6997            };
6998
6999            Ok(Plan::AlterSink(AlterSinkPlan {
7000                item_id: item.id(),
7001                global_id: item.global_id(),
7002                sink: plan.sink,
7003                with_snapshot: plan.with_snapshot,
7004                in_cluster: plan.in_cluster,
7005            }))
7006        }
7007        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7008        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7009    }
7010}
7011
7012pub fn describe_alter_source(
7013    _: &StatementContext,
7014    _: AlterSourceStatement<Aug>,
7015) -> Result<StatementDesc, PlanError> {
7016    // TODO: put the options here, right?
7017    Ok(StatementDesc::new(None))
7018}
7019
7020generate_extracted_config!(
7021    AlterSourceAddSubsourceOption,
7022    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7023    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7024    (Details, String)
7025);
7026
7027pub fn plan_alter_source(
7028    scx: &mut StatementContext,
7029    stmt: AlterSourceStatement<Aug>,
7030) -> Result<Plan, PlanError> {
7031    let AlterSourceStatement {
7032        source_name,
7033        if_exists,
7034        action,
7035    } = stmt;
7036    let object_type = ObjectType::Source;
7037
7038    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7039        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7040            name: source_name.to_string(),
7041            object_type,
7042        });
7043
7044        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7045    }
7046
7047    match action {
7048        AlterSourceAction::SetOptions(options) => {
7049            let mut options = options.into_iter();
7050            let option = options.next().unwrap();
7051            if option.name == CreateSourceOptionName::RetainHistory {
7052                if options.next().is_some() {
7053                    sql_bail!("RETAIN HISTORY must be only option");
7054                }
7055                return alter_retain_history(
7056                    scx,
7057                    object_type,
7058                    if_exists,
7059                    UnresolvedObjectName::Item(source_name),
7060                    option.value,
7061                );
7062            }
7063            // n.b we use this statement in purification in a way that cannot be
7064            // planned directly.
7065            sql_bail!(
7066                "Cannot modify the {} of a SOURCE.",
7067                option.name.to_ast_string_simple()
7068            );
7069        }
7070        AlterSourceAction::ResetOptions(reset) => {
7071            let mut options = reset.into_iter();
7072            let option = options.next().unwrap();
7073            if option == CreateSourceOptionName::RetainHistory {
7074                if options.next().is_some() {
7075                    sql_bail!("RETAIN HISTORY must be only option");
7076                }
7077                return alter_retain_history(
7078                    scx,
7079                    object_type,
7080                    if_exists,
7081                    UnresolvedObjectName::Item(source_name),
7082                    None,
7083                );
7084            }
7085            sql_bail!(
7086                "Cannot modify the {} of a SOURCE.",
7087                option.to_ast_string_simple()
7088            );
7089        }
7090        AlterSourceAction::DropSubsources { .. } => {
7091            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7092        }
7093        AlterSourceAction::AddSubsources { .. } => {
7094            unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7095        }
7096        AlterSourceAction::RefreshReferences => {
7097            unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7098        }
7099    };
7100}
7101
7102pub fn describe_alter_system_set(
7103    _: &StatementContext,
7104    _: AlterSystemSetStatement,
7105) -> Result<StatementDesc, PlanError> {
7106    Ok(StatementDesc::new(None))
7107}
7108
7109pub fn plan_alter_system_set(
7110    _: &StatementContext,
7111    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7112) -> Result<Plan, PlanError> {
7113    let name = name.to_string();
7114    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7115        name,
7116        value: scl::plan_set_variable_to(to)?,
7117    }))
7118}
7119
7120pub fn describe_alter_system_reset(
7121    _: &StatementContext,
7122    _: AlterSystemResetStatement,
7123) -> Result<StatementDesc, PlanError> {
7124    Ok(StatementDesc::new(None))
7125}
7126
7127pub fn plan_alter_system_reset(
7128    _: &StatementContext,
7129    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7130) -> Result<Plan, PlanError> {
7131    let name = name.to_string();
7132    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7133}
7134
7135pub fn describe_alter_system_reset_all(
7136    _: &StatementContext,
7137    _: AlterSystemResetAllStatement,
7138) -> Result<StatementDesc, PlanError> {
7139    Ok(StatementDesc::new(None))
7140}
7141
7142pub fn plan_alter_system_reset_all(
7143    _: &StatementContext,
7144    _: AlterSystemResetAllStatement,
7145) -> Result<Plan, PlanError> {
7146    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7147}
7148
7149pub fn describe_alter_role(
7150    _: &StatementContext,
7151    _: AlterRoleStatement<Aug>,
7152) -> Result<StatementDesc, PlanError> {
7153    Ok(StatementDesc::new(None))
7154}
7155
7156pub fn plan_alter_role(
7157    _scx: &StatementContext,
7158    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7159) -> Result<Plan, PlanError> {
7160    let option = match option {
7161        AlterRoleOption::Attributes(attrs) => {
7162            let attrs = plan_role_attributes(attrs)?;
7163            PlannedAlterRoleOption::Attributes(attrs)
7164        }
7165        AlterRoleOption::Variable(variable) => {
7166            let var = plan_role_variable(variable)?;
7167            PlannedAlterRoleOption::Variable(var)
7168        }
7169    };
7170
7171    Ok(Plan::AlterRole(AlterRolePlan {
7172        id: name.id,
7173        name: name.name,
7174        option,
7175    }))
7176}
7177
7178pub fn describe_alter_table_add_column(
7179    _: &StatementContext,
7180    _: AlterTableAddColumnStatement<Aug>,
7181) -> Result<StatementDesc, PlanError> {
7182    Ok(StatementDesc::new(None))
7183}
7184
7185pub fn plan_alter_table_add_column(
7186    scx: &StatementContext,
7187    stmt: AlterTableAddColumnStatement<Aug>,
7188) -> Result<Plan, PlanError> {
7189    let AlterTableAddColumnStatement {
7190        if_exists,
7191        name,
7192        if_col_not_exist,
7193        column_name,
7194        data_type,
7195    } = stmt;
7196    let object_type = ObjectType::Table;
7197
7198    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7199
7200    let (relation_id, item_name, desc) =
7201        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7202            Some(item) => {
7203                // Always add columns to the latest version of the item.
7204                let item_name = scx.catalog.resolve_full_name(item.name());
7205                let item = item.at_version(RelationVersionSelector::Latest);
7206                let desc = item.desc(&item_name)?.into_owned();
7207                (item.id(), item_name, desc)
7208            }
7209            None => {
7210                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7211                    name: name.to_ast_string_simple(),
7212                    object_type,
7213                });
7214                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7215            }
7216        };
7217
7218    let column_name = ColumnName::from(column_name.as_str());
7219    if desc.get_by_name(&column_name).is_some() {
7220        if if_col_not_exist {
7221            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7222                column_name: column_name.to_string(),
7223                object_name: item_name.item,
7224            });
7225            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7226        } else {
7227            return Err(PlanError::ColumnAlreadyExists {
7228                column_name,
7229                object_name: item_name.item,
7230            });
7231        }
7232    }
7233
7234    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7235    // TODO(alter_table): Support non-nullable columns with default values.
7236    let column_type = scalar_type.nullable(true);
7237    // "unresolve" our data type so we can later update the persisted create_sql.
7238    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7239
7240    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7241        relation_id,
7242        column_name,
7243        column_type,
7244        raw_sql_type,
7245    }))
7246}
7247
7248pub fn describe_comment(
7249    _: &StatementContext,
7250    _: CommentStatement<Aug>,
7251) -> Result<StatementDesc, PlanError> {
7252    Ok(StatementDesc::new(None))
7253}
7254
7255pub fn plan_comment(
7256    scx: &mut StatementContext,
7257    stmt: CommentStatement<Aug>,
7258) -> Result<Plan, PlanError> {
7259    const MAX_COMMENT_LENGTH: usize = 1024;
7260
7261    let CommentStatement { object, comment } = stmt;
7262
7263    // TODO(parkmycar): Make max comment length configurable.
7264    if let Some(c) = &comment {
7265        if c.len() > 1024 {
7266            return Err(PlanError::CommentTooLong {
7267                length: c.len(),
7268                max_size: MAX_COMMENT_LENGTH,
7269            });
7270        }
7271    }
7272
7273    let (object_id, column_pos) = match &object {
7274        com_ty @ CommentObjectType::Table { name }
7275        | com_ty @ CommentObjectType::View { name }
7276        | com_ty @ CommentObjectType::MaterializedView { name }
7277        | com_ty @ CommentObjectType::Index { name }
7278        | com_ty @ CommentObjectType::Func { name }
7279        | com_ty @ CommentObjectType::Connection { name }
7280        | com_ty @ CommentObjectType::Source { name }
7281        | com_ty @ CommentObjectType::Sink { name }
7282        | com_ty @ CommentObjectType::Secret { name }
7283        | com_ty @ CommentObjectType::ContinualTask { name } => {
7284            let item = scx.get_item_by_resolved_name(name)?;
7285            match (com_ty, item.item_type()) {
7286                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7287                    (CommentObjectId::Table(item.id()), None)
7288                }
7289                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7290                    (CommentObjectId::View(item.id()), None)
7291                }
7292                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7293                    (CommentObjectId::MaterializedView(item.id()), None)
7294                }
7295                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7296                    (CommentObjectId::Index(item.id()), None)
7297                }
7298                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7299                    (CommentObjectId::Func(item.id()), None)
7300                }
7301                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7302                    (CommentObjectId::Connection(item.id()), None)
7303                }
7304                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7305                    (CommentObjectId::Source(item.id()), None)
7306                }
7307                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7308                    (CommentObjectId::Sink(item.id()), None)
7309                }
7310                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7311                    (CommentObjectId::Secret(item.id()), None)
7312                }
7313                (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7314                    (CommentObjectId::ContinualTask(item.id()), None)
7315                }
7316                (com_ty, cat_ty) => {
7317                    let expected_type = match com_ty {
7318                        CommentObjectType::Table { .. } => ObjectType::Table,
7319                        CommentObjectType::View { .. } => ObjectType::View,
7320                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7321                        CommentObjectType::Index { .. } => ObjectType::Index,
7322                        CommentObjectType::Func { .. } => ObjectType::Func,
7323                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7324                        CommentObjectType::Source { .. } => ObjectType::Source,
7325                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7326                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7327                        _ => unreachable!("these are the only types we match on"),
7328                    };
7329
7330                    return Err(PlanError::InvalidObjectType {
7331                        expected_type: SystemObjectType::Object(expected_type),
7332                        actual_type: SystemObjectType::Object(cat_ty.into()),
7333                        object_name: item.name().item.clone(),
7334                    });
7335                }
7336            }
7337        }
7338        CommentObjectType::Type { ty } => match ty {
7339            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7340                sql_bail!("cannot comment on anonymous list or map type");
7341            }
7342            ResolvedDataType::Named { id, modifiers, .. } => {
7343                if !modifiers.is_empty() {
7344                    sql_bail!("cannot comment on type with modifiers");
7345                }
7346                (CommentObjectId::Type(*id), None)
7347            }
7348            ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7349        },
7350        CommentObjectType::Column { name } => {
7351            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7352            match item.item_type() {
7353                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7354                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7355                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7356                CatalogItemType::MaterializedView => {
7357                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7358                }
7359                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7360                r => {
7361                    return Err(PlanError::Unsupported {
7362                        feature: format!("Specifying comments on a column of {r}"),
7363                        discussion_no: None,
7364                    });
7365                }
7366            }
7367        }
7368        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7369        CommentObjectType::Database { name } => {
7370            (CommentObjectId::Database(*name.database_id()), None)
7371        }
7372        CommentObjectType::Schema { name } => (
7373            CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7374            None,
7375        ),
7376        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7377        CommentObjectType::ClusterReplica { name } => {
7378            let replica = scx.catalog.resolve_cluster_replica(name)?;
7379            (
7380                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7381                None,
7382            )
7383        }
7384        CommentObjectType::NetworkPolicy { name } => {
7385            (CommentObjectId::NetworkPolicy(name.id), None)
7386        }
7387    };
7388
7389    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7390    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7391    // it's the easiest place to raise an error.
7392    //
7393    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7394    if let Some(p) = column_pos {
7395        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7396            max_num_columns: MAX_NUM_COLUMNS,
7397            req_num_columns: p,
7398        })?;
7399    }
7400
7401    Ok(Plan::Comment(CommentPlan {
7402        object_id,
7403        sub_component: column_pos,
7404        comment,
7405    }))
7406}
7407
7408pub(crate) fn resolve_cluster<'a>(
7409    scx: &'a StatementContext,
7410    name: &'a Ident,
7411    if_exists: bool,
7412) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7413    match scx.resolve_cluster(Some(name)) {
7414        Ok(cluster) => Ok(Some(cluster)),
7415        Err(_) if if_exists => Ok(None),
7416        Err(e) => Err(e),
7417    }
7418}
7419
7420pub(crate) fn resolve_cluster_replica<'a>(
7421    scx: &'a StatementContext,
7422    name: &QualifiedReplica,
7423    if_exists: bool,
7424) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7425    match scx.resolve_cluster(Some(&name.cluster)) {
7426        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7427            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7428            None if if_exists => Ok(None),
7429            None => Err(sql_err!(
7430                "CLUSTER {} has no CLUSTER REPLICA named {}",
7431                cluster.name(),
7432                name.replica.as_str().quoted(),
7433            )),
7434        },
7435        Err(_) if if_exists => Ok(None),
7436        Err(e) => Err(e),
7437    }
7438}
7439
7440pub(crate) fn resolve_database<'a>(
7441    scx: &'a StatementContext,
7442    name: &'a UnresolvedDatabaseName,
7443    if_exists: bool,
7444) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7445    match scx.resolve_database(name) {
7446        Ok(database) => Ok(Some(database)),
7447        Err(_) if if_exists => Ok(None),
7448        Err(e) => Err(e),
7449    }
7450}
7451
7452pub(crate) fn resolve_schema<'a>(
7453    scx: &'a StatementContext,
7454    name: UnresolvedSchemaName,
7455    if_exists: bool,
7456) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7457    match scx.resolve_schema(name) {
7458        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7459        Err(_) if if_exists => Ok(None),
7460        Err(e) => Err(e),
7461    }
7462}
7463
7464pub(crate) fn resolve_network_policy<'a>(
7465    scx: &'a StatementContext,
7466    name: Ident,
7467    if_exists: bool,
7468) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7469    match scx.catalog.resolve_network_policy(&name.to_string()) {
7470        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7471            id: policy.id(),
7472            name: policy.name().to_string(),
7473        })),
7474        Err(_) if if_exists => Ok(None),
7475        Err(e) => Err(e.into()),
7476    }
7477}
7478
7479pub(crate) fn resolve_item_or_type<'a>(
7480    scx: &'a StatementContext,
7481    object_type: ObjectType,
7482    name: UnresolvedItemName,
7483    if_exists: bool,
7484) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7485    let name = normalize::unresolved_item_name(name)?;
7486    let catalog_item = match object_type {
7487        ObjectType::Type => scx.catalog.resolve_type(&name),
7488        _ => scx.catalog.resolve_item(&name),
7489    };
7490
7491    match catalog_item {
7492        Ok(item) => {
7493            let is_type = ObjectType::from(item.item_type());
7494            if object_type == is_type {
7495                Ok(Some(item))
7496            } else {
7497                Err(PlanError::MismatchedObjectType {
7498                    name: scx.catalog.minimal_qualification(item.name()),
7499                    is_type,
7500                    expected_type: object_type,
7501                })
7502            }
7503        }
7504        Err(_) if if_exists => Ok(None),
7505        Err(e) => Err(e.into()),
7506    }
7507}
7508
7509/// Returns an error if the given cluster is a managed cluster
7510fn ensure_cluster_is_not_managed(
7511    scx: &StatementContext,
7512    cluster_id: ClusterId,
7513) -> Result<(), PlanError> {
7514    let cluster = scx.catalog.get_cluster(cluster_id);
7515    if cluster.is_managed() {
7516        Err(PlanError::ManagedCluster {
7517            cluster_name: cluster.name().to_string(),
7518        })
7519    } else {
7520        Ok(())
7521    }
7522}