1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
155 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
156 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
157 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
158 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
159 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
160 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
161 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
162 MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
163 PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
164 Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
165 WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
166 transform_ast,
167};
168use crate::session::vars::{
169 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
170 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
171 ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
172};
173use crate::{names, parse};
174
175mod connection;
176
177const MAX_NUM_COLUMNS: usize = 256;
182
183static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
184 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
185
186fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
190 if partition_by.len() > desc.len() {
191 tracing::error!(
192 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
193 desc.len(),
194 partition_by.len()
195 );
196 partition_by.truncate(desc.len());
197 }
198
199 let desc_prefix = desc.iter().take(partition_by.len());
200 for (idx, ((desc_name, desc_type), partition_name)) in
201 desc_prefix.zip_eq(partition_by).enumerate()
202 {
203 let partition_name = normalize::column_name(partition_name);
204 if *desc_name != partition_name {
205 sql_bail!(
206 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
207 );
208 }
209 if !preserves_order(&desc_type.scalar_type) {
210 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
211 }
212 }
213 Ok(())
214}
215
216pub fn describe_create_database(
217 _: &StatementContext,
218 _: CreateDatabaseStatement,
219) -> Result<StatementDesc, PlanError> {
220 Ok(StatementDesc::new(None))
221}
222
223pub fn plan_create_database(
224 _: &StatementContext,
225 CreateDatabaseStatement {
226 name,
227 if_not_exists,
228 }: CreateDatabaseStatement,
229) -> Result<Plan, PlanError> {
230 Ok(Plan::CreateDatabase(CreateDatabasePlan {
231 name: normalize::ident(name.0),
232 if_not_exists,
233 }))
234}
235
236pub fn describe_create_schema(
237 _: &StatementContext,
238 _: CreateSchemaStatement,
239) -> Result<StatementDesc, PlanError> {
240 Ok(StatementDesc::new(None))
241}
242
243pub fn plan_create_schema(
244 scx: &StatementContext,
245 CreateSchemaStatement {
246 mut name,
247 if_not_exists,
248 }: CreateSchemaStatement,
249) -> Result<Plan, PlanError> {
250 if name.0.len() > 2 {
251 sql_bail!("schema name {} has more than two components", name);
252 }
253 let schema_name = normalize::ident(
254 name.0
255 .pop()
256 .expect("names always have at least one component"),
257 );
258 let database_spec = match name.0.pop() {
259 None => match scx.catalog.active_database() {
260 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
261 None => sql_bail!("no database specified and no active database"),
262 },
263 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
264 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
265 Err(_) => sql_bail!("invalid database {}", n.as_str()),
266 },
267 };
268 Ok(Plan::CreateSchema(CreateSchemaPlan {
269 database_spec,
270 schema_name,
271 if_not_exists,
272 }))
273}
274
275pub fn describe_create_table(
276 _: &StatementContext,
277 _: CreateTableStatement<Aug>,
278) -> Result<StatementDesc, PlanError> {
279 Ok(StatementDesc::new(None))
280}
281
282pub fn plan_create_table(
283 scx: &StatementContext,
284 stmt: CreateTableStatement<Aug>,
285) -> Result<Plan, PlanError> {
286 let CreateTableStatement {
287 name,
288 columns,
289 constraints,
290 if_not_exists,
291 temporary,
292 with_options,
293 } = &stmt;
294
295 let names: Vec<_> = columns
296 .iter()
297 .filter(|c| {
298 let is_versioned = c
302 .options
303 .iter()
304 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
305 !is_versioned
306 })
307 .map(|c| normalize::column_name(c.name.clone()))
308 .collect();
309
310 if let Some(dup) = names.iter().duplicates().next() {
311 sql_bail!("column {} specified more than once", dup.quoted());
312 }
313
314 let mut column_types = Vec::with_capacity(columns.len());
317 let mut defaults = Vec::with_capacity(columns.len());
318 let mut changes = BTreeMap::new();
319 let mut keys = Vec::new();
320
321 for (i, c) in columns.into_iter().enumerate() {
322 let aug_data_type = &c.data_type;
323 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
324 let mut nullable = true;
325 let mut default = Expr::null();
326 let mut versioned = false;
327 for option in &c.options {
328 match &option.option {
329 ColumnOption::NotNull => nullable = false,
330 ColumnOption::Default(expr) => {
331 let mut expr = expr.clone();
334 transform_ast::transform(scx, &mut expr)?;
335 let _ = query::plan_default_expr(scx, &expr, &ty)?;
336 default = expr.clone();
337 }
338 ColumnOption::Unique { is_primary } => {
339 keys.push(vec![i]);
340 if *is_primary {
341 nullable = false;
342 }
343 }
344 ColumnOption::Versioned { action, version } => {
345 let version = RelationVersion::from(*version);
346 versioned = true;
347
348 let name = normalize::column_name(c.name.clone());
349 let typ = ty.clone().nullable(nullable);
350
351 changes.insert(version, (action.clone(), name, typ));
352 }
353 other => {
354 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
355 }
356 }
357 }
358 if !versioned {
361 column_types.push(ty.nullable(nullable));
362 }
363 defaults.push(default);
364 }
365
366 let mut seen_primary = false;
367 'c: for constraint in constraints {
368 match constraint {
369 TableConstraint::Unique {
370 name: _,
371 columns,
372 is_primary,
373 nulls_not_distinct,
374 } => {
375 if seen_primary && *is_primary {
376 sql_bail!(
377 "multiple primary keys for table {} are not allowed",
378 name.to_ast_string_stable()
379 );
380 }
381 seen_primary = *is_primary || seen_primary;
382
383 let mut key = vec![];
384 for column in columns {
385 let column = normalize::column_name(column.clone());
386 match names.iter().position(|name| *name == column) {
387 None => sql_bail!("unknown column in constraint: {}", column),
388 Some(i) => {
389 let nullable = &mut column_types[i].nullable;
390 if *is_primary {
391 if *nulls_not_distinct {
392 sql_bail!(
393 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
394 );
395 }
396
397 *nullable = false;
398 } else if !(*nulls_not_distinct || !*nullable) {
399 break 'c;
402 }
403
404 key.push(i);
405 }
406 }
407 }
408
409 if *is_primary {
410 keys.insert(0, key);
411 } else {
412 keys.push(key);
413 }
414 }
415 TableConstraint::ForeignKey { .. } => {
416 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
419 }
420 TableConstraint::Check { .. } => {
421 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
424 }
425 }
426 }
427
428 if !keys.is_empty() {
429 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
432 }
433
434 let typ = SqlRelationType::new(column_types).with_keys(keys);
435
436 let temporary = *temporary;
437 let name = if temporary {
438 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 } else {
440 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
441 };
442
443 let full_name = scx.catalog.resolve_full_name(&name);
445 let partial_name = PartialItemName::from(full_name.clone());
446 if let (false, Ok(item)) = (
449 if_not_exists,
450 scx.catalog.resolve_item_or_type(&partial_name),
451 ) {
452 return Err(PlanError::ItemAlreadyExists {
453 name: full_name.to_string(),
454 item_type: item.item_type(),
455 });
456 }
457
458 let desc = RelationDesc::new(typ, names);
459 let mut desc = VersionedRelationDesc::new(desc);
460 for (version, (_action, name, typ)) in changes.into_iter() {
461 let new_version = desc.add_column(name, typ);
462 if version != new_version {
463 return Err(PlanError::InvalidTable {
464 name: full_name.item,
465 });
466 }
467 }
468
469 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
470
471 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
477 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
478
479 let compaction_window = options.iter().find_map(|o| {
480 #[allow(irrefutable_let_patterns)]
481 if let crate::plan::TableOption::RetainHistory(lcw) = o {
482 Some(lcw.clone())
483 } else {
484 None
485 }
486 });
487
488 let table = Table {
489 create_sql,
490 desc,
491 temporary,
492 compaction_window,
493 data_source: TableDataSource::TableWrites { defaults },
494 };
495 Ok(Plan::CreateTable(CreateTablePlan {
496 name,
497 table,
498 if_not_exists: *if_not_exists,
499 }))
500}
501
502pub fn describe_create_table_from_source(
503 _: &StatementContext,
504 _: CreateTableFromSourceStatement<Aug>,
505) -> Result<StatementDesc, PlanError> {
506 Ok(StatementDesc::new(None))
507}
508
509pub fn describe_create_webhook_source(
510 _: &StatementContext,
511 _: CreateWebhookSourceStatement<Aug>,
512) -> Result<StatementDesc, PlanError> {
513 Ok(StatementDesc::new(None))
514}
515
516pub fn describe_create_source(
517 _: &StatementContext,
518 _: CreateSourceStatement<Aug>,
519) -> Result<StatementDesc, PlanError> {
520 Ok(StatementDesc::new(None))
521}
522
523pub fn describe_create_subsource(
524 _: &StatementContext,
525 _: CreateSubsourceStatement<Aug>,
526) -> Result<StatementDesc, PlanError> {
527 Ok(StatementDesc::new(None))
528}
529
530generate_extracted_config!(
531 CreateSourceOption,
532 (TimestampInterval, Duration),
533 (RetainHistory, OptionalDuration)
534);
535
536generate_extracted_config!(
537 PgConfigOption,
538 (Details, String),
539 (Publication, String),
540 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
541 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
542);
543
544generate_extracted_config!(
545 MySqlConfigOption,
546 (Details, String),
547 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
548 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
549);
550
551generate_extracted_config!(
552 SqlServerConfigOption,
553 (Details, String),
554 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
555 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
556);
557
558pub fn plan_create_webhook_source(
559 scx: &StatementContext,
560 mut stmt: CreateWebhookSourceStatement<Aug>,
561) -> Result<Plan, PlanError> {
562 if stmt.is_table {
563 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
564 }
565
566 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
569 let enable_multi_replica_sources =
570 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
571 if !enable_multi_replica_sources {
572 if in_cluster.replica_ids().len() > 1 {
573 sql_bail!("cannot create webhook source in cluster with more than one replica")
574 }
575 }
576 let create_sql =
577 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
578
579 let CreateWebhookSourceStatement {
580 name,
581 if_not_exists,
582 body_format,
583 include_headers,
584 validate_using,
585 is_table,
586 in_cluster: _,
588 } = stmt;
589
590 let validate_using = validate_using
591 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
592 .transpose()?;
593 if let Some(WebhookValidation { expression, .. }) = &validate_using {
594 if !expression.contains_column() {
597 return Err(PlanError::WebhookValidationDoesNotUseColumns);
598 }
599 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
603 return Err(PlanError::WebhookValidationNonDeterministic);
604 }
605 }
606
607 let body_format = match body_format {
608 Format::Bytes => WebhookBodyFormat::Bytes,
609 Format::Json { array } => WebhookBodyFormat::Json { array },
610 Format::Text => WebhookBodyFormat::Text,
611 ty => {
613 return Err(PlanError::Unsupported {
614 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
615 discussion_no: None,
616 });
617 }
618 };
619
620 let mut column_ty = vec![
621 SqlColumnType {
623 scalar_type: SqlScalarType::from(body_format),
624 nullable: false,
625 },
626 ];
627 let mut column_names = vec!["body".to_string()];
628
629 let mut headers = WebhookHeaders::default();
630
631 if let Some(filters) = include_headers.column {
633 column_ty.push(SqlColumnType {
634 scalar_type: SqlScalarType::Map {
635 value_type: Box::new(SqlScalarType::String),
636 custom_id: None,
637 },
638 nullable: false,
639 });
640 column_names.push("headers".to_string());
641
642 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
643 filters.into_iter().partition_map(|filter| {
644 if filter.block {
645 itertools::Either::Right(filter.header_name)
646 } else {
647 itertools::Either::Left(filter.header_name)
648 }
649 });
650 headers.header_column = Some(WebhookHeaderFilters { allow, block });
651 }
652
653 for header in include_headers.mappings {
655 let scalar_type = header
656 .use_bytes
657 .then_some(SqlScalarType::Bytes)
658 .unwrap_or(SqlScalarType::String);
659 column_ty.push(SqlColumnType {
660 scalar_type,
661 nullable: true,
662 });
663 column_names.push(header.column_name.into_string());
664
665 let column_idx = column_ty.len() - 1;
666 assert_eq!(
668 column_idx,
669 column_names.len() - 1,
670 "header column names and types don't match"
671 );
672 headers
673 .mapped_headers
674 .insert(column_idx, (header.header_name, header.use_bytes));
675 }
676
677 let mut unique_check = HashSet::with_capacity(column_names.len());
679 for name in &column_names {
680 if !unique_check.insert(name) {
681 return Err(PlanError::AmbiguousColumn(name.clone().into()));
682 }
683 }
684 if column_names.len() > MAX_NUM_COLUMNS {
685 return Err(PlanError::TooManyColumns {
686 max_num_columns: MAX_NUM_COLUMNS,
687 req_num_columns: column_names.len(),
688 });
689 }
690
691 let typ = SqlRelationType::new(column_ty);
692 let desc = RelationDesc::new(typ, column_names);
693
694 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
696 let full_name = scx.catalog.resolve_full_name(&name);
697 let partial_name = PartialItemName::from(full_name.clone());
698 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
699 return Err(PlanError::ItemAlreadyExists {
700 name: full_name.to_string(),
701 item_type: item.item_type(),
702 });
703 }
704
705 let timeline = Timeline::EpochMilliseconds;
708
709 let plan = if is_table {
710 let data_source = DataSourceDesc::Webhook {
711 validate_using,
712 body_format,
713 headers,
714 cluster_id: Some(in_cluster.id()),
715 };
716 let data_source = TableDataSource::DataSource {
717 desc: data_source,
718 timeline,
719 };
720 Plan::CreateTable(CreateTablePlan {
721 name,
722 if_not_exists,
723 table: Table {
724 create_sql,
725 desc: VersionedRelationDesc::new(desc),
726 temporary: false,
727 compaction_window: None,
728 data_source,
729 },
730 })
731 } else {
732 let data_source = DataSourceDesc::Webhook {
733 validate_using,
734 body_format,
735 headers,
736 cluster_id: None,
738 };
739 Plan::CreateSource(CreateSourcePlan {
740 name,
741 source: Source {
742 create_sql,
743 data_source,
744 desc,
745 compaction_window: None,
746 },
747 if_not_exists,
748 timeline,
749 in_cluster: Some(in_cluster.id()),
750 })
751 };
752
753 Ok(plan)
754}
755
756pub fn plan_create_source(
757 scx: &StatementContext,
758 mut stmt: CreateSourceStatement<Aug>,
759) -> Result<Plan, PlanError> {
760 let CreateSourceStatement {
761 name,
762 in_cluster: _,
763 col_names,
764 connection: source_connection,
765 envelope,
766 if_not_exists,
767 format,
768 key_constraint,
769 include_metadata,
770 with_options,
771 external_references: referenced_subsources,
772 progress_subsource,
773 } = &stmt;
774
775 mz_ore::soft_assert_or_log!(
776 referenced_subsources.is_none(),
777 "referenced subsources must be cleared in purification"
778 );
779
780 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
781 && scx.catalog.system_vars().force_source_table_syntax();
782
783 if force_source_table_syntax {
786 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
787 Err(PlanError::UseTablesForSources(
788 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
789 ))?;
790 }
791 }
792
793 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
794
795 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
796 && include_metadata
797 .iter()
798 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
799 {
800 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
802 }
803 if !matches!(
804 source_connection,
805 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
806 ) && !include_metadata.is_empty()
807 {
808 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
809 }
810
811 if !include_metadata.is_empty()
812 && !matches!(
813 envelope,
814 ast::SourceEnvelope::Upsert { .. }
815 | ast::SourceEnvelope::None
816 | ast::SourceEnvelope::Debezium
817 )
818 {
819 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
820 }
821
822 let external_connection =
823 plan_generic_source_connection(scx, source_connection, include_metadata)?;
824
825 let CreateSourceOptionExtracted {
826 timestamp_interval,
827 retain_history,
828 seen: _,
829 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
830
831 let metadata_columns_desc = match external_connection {
832 GenericSourceConnection::Kafka(KafkaSourceConnection {
833 ref metadata_columns,
834 ..
835 }) => kafka_metadata_columns_desc(metadata_columns),
836 _ => vec![],
837 };
838
839 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
841 scx,
842 &envelope,
843 format,
844 Some(external_connection.default_key_desc()),
845 external_connection.default_value_desc(),
846 include_metadata,
847 metadata_columns_desc,
848 &external_connection,
849 )?;
850 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
851
852 let names: Vec<_> = desc.iter_names().cloned().collect();
853 if let Some(dup) = names.iter().duplicates().next() {
854 sql_bail!("column {} specified more than once", dup.quoted());
855 }
856
857 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
859 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
862
863 let key_columns = columns
864 .into_iter()
865 .map(normalize::column_name)
866 .collect::<Vec<_>>();
867
868 let mut uniq = BTreeSet::new();
869 for col in key_columns.iter() {
870 if !uniq.insert(col) {
871 sql_bail!("Repeated column name in source key constraint: {}", col);
872 }
873 }
874
875 let key_indices = key_columns
876 .iter()
877 .map(|col| {
878 let name_idx = desc
879 .get_by_name(col)
880 .map(|(idx, _type)| idx)
881 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
882 if desc.get_unambiguous_name(name_idx).is_none() {
883 sql_bail!("Ambiguous column in source key constraint: {}", col);
884 }
885 Ok(name_idx)
886 })
887 .collect::<Result<Vec<_>, _>>()?;
888
889 if !desc.typ().keys.is_empty() {
890 return Err(key_constraint_err(&desc, &key_columns));
891 } else {
892 desc = desc.with_key(key_indices);
893 }
894 }
895
896 let timestamp_interval = match timestamp_interval {
897 Some(duration) => {
898 if scx.pcx.is_some() {
902 let min = scx.catalog.system_vars().min_timestamp_interval();
903 let max = scx.catalog.system_vars().max_timestamp_interval();
904 if duration < min || duration > max {
905 return Err(PlanError::InvalidTimestampInterval {
906 min,
907 max,
908 requested: duration,
909 });
910 }
911 }
912 duration
913 }
914 None => scx.catalog.system_vars().default_timestamp_interval(),
915 };
916
917 let (desc, data_source) = match progress_subsource {
918 Some(name) => {
919 let DeferredItemName::Named(name) = name else {
920 sql_bail!("[internal error] progress subsource must be named during purification");
921 };
922 let ResolvedItemName::Item { id, .. } = name else {
923 sql_bail!("[internal error] invalid target id");
924 };
925
926 let details = match external_connection {
927 GenericSourceConnection::Kafka(ref c) => {
928 SourceExportDetails::Kafka(KafkaSourceExportDetails {
929 metadata_columns: c.metadata_columns.clone(),
930 })
931 }
932 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
933 LoadGenerator::Auction
934 | LoadGenerator::Marketing
935 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
936 LoadGenerator::Counter { .. }
937 | LoadGenerator::Clock
938 | LoadGenerator::Datums
939 | LoadGenerator::KeyValue(_) => {
940 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
941 output: LoadGeneratorOutput::Default,
942 })
943 }
944 },
945 GenericSourceConnection::Postgres(_)
946 | GenericSourceConnection::MySql(_)
947 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
948 };
949
950 let data_source = DataSourceDesc::OldSyntaxIngestion {
951 desc: SourceDesc {
952 connection: external_connection,
953 timestamp_interval,
954 },
955 progress_subsource: *id,
956 data_config: SourceExportDataConfig {
957 encoding,
958 envelope: envelope.clone(),
959 },
960 details,
961 };
962 (desc, data_source)
963 }
964 None => {
965 let desc = external_connection.timestamp_desc();
966 let data_source = DataSourceDesc::Ingestion(SourceDesc {
967 connection: external_connection,
968 timestamp_interval,
969 });
970 (desc, data_source)
971 }
972 };
973
974 let if_not_exists = *if_not_exists;
975 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
976
977 let full_name = scx.catalog.resolve_full_name(&name);
979 let partial_name = PartialItemName::from(full_name.clone());
980 if let (false, Ok(item)) = (
983 if_not_exists,
984 scx.catalog.resolve_item_or_type(&partial_name),
985 ) {
986 return Err(PlanError::ItemAlreadyExists {
987 name: full_name.to_string(),
988 item_type: item.item_type(),
989 });
990 }
991
992 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
996
997 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
998
999 let timeline = match envelope {
1001 SourceEnvelope::CdcV2 => {
1002 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1003 }
1004 _ => Timeline::EpochMilliseconds,
1005 };
1006
1007 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1008
1009 let source = Source {
1010 create_sql,
1011 data_source,
1012 desc,
1013 compaction_window,
1014 };
1015
1016 Ok(Plan::CreateSource(CreateSourcePlan {
1017 name,
1018 source,
1019 if_not_exists,
1020 timeline,
1021 in_cluster: Some(in_cluster.id()),
1022 }))
1023}
1024
1025pub fn plan_generic_source_connection(
1026 scx: &StatementContext<'_>,
1027 source_connection: &CreateSourceConnection<Aug>,
1028 include_metadata: &Vec<SourceIncludeMetadata>,
1029) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1030 Ok(match source_connection {
1031 CreateSourceConnection::Kafka {
1032 connection,
1033 options,
1034 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1035 scx,
1036 connection,
1037 options,
1038 include_metadata,
1039 )?),
1040 CreateSourceConnection::Postgres {
1041 connection,
1042 options,
1043 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1044 scx, connection, options,
1045 )?),
1046 CreateSourceConnection::SqlServer {
1047 connection,
1048 options,
1049 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1050 scx, connection, options,
1051 )?),
1052 CreateSourceConnection::MySql {
1053 connection,
1054 options,
1055 } => {
1056 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1057 }
1058 CreateSourceConnection::LoadGenerator { generator, options } => {
1059 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1060 scx,
1061 generator,
1062 options,
1063 include_metadata,
1064 )?)
1065 }
1066 })
1067}
1068
1069fn plan_load_generator_source_connection(
1070 scx: &StatementContext<'_>,
1071 generator: &ast::LoadGenerator,
1072 options: &Vec<LoadGeneratorOption<Aug>>,
1073 include_metadata: &Vec<SourceIncludeMetadata>,
1074) -> Result<LoadGeneratorSourceConnection, PlanError> {
1075 let load_generator =
1076 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1077 let LoadGeneratorOptionExtracted {
1078 tick_interval,
1079 as_of,
1080 up_to,
1081 ..
1082 } = options.clone().try_into()?;
1083 let tick_micros = match tick_interval {
1084 Some(interval) => Some(interval.as_micros().try_into()?),
1085 None => None,
1086 };
1087 if up_to < as_of {
1088 sql_bail!("UP TO cannot be less than AS OF");
1089 }
1090 Ok(LoadGeneratorSourceConnection {
1091 load_generator,
1092 tick_micros,
1093 as_of,
1094 up_to,
1095 })
1096}
1097
1098fn plan_mysql_source_connection(
1099 scx: &StatementContext<'_>,
1100 connection: &ResolvedItemName,
1101 options: &Vec<MySqlConfigOption<Aug>>,
1102) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1103 let connection_item = scx.get_item_by_resolved_name(connection)?;
1104 match connection_item.connection()? {
1105 Connection::MySql(connection) => connection,
1106 _ => sql_bail!(
1107 "{} is not a MySQL connection",
1108 scx.catalog.resolve_full_name(connection_item.name())
1109 ),
1110 };
1111 let MySqlConfigOptionExtracted {
1112 details,
1113 text_columns: _,
1117 exclude_columns: _,
1118 seen: _,
1119 } = options.clone().try_into()?;
1120 let details = details
1121 .as_ref()
1122 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1123 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1124 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1125 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1126 Ok(MySqlSourceConnection {
1127 connection: connection_item.id(),
1128 connection_id: connection_item.id(),
1129 details,
1130 })
1131}
1132
1133fn plan_sqlserver_source_connection(
1134 scx: &StatementContext<'_>,
1135 connection: &ResolvedItemName,
1136 options: &Vec<SqlServerConfigOption<Aug>>,
1137) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1138 let connection_item = scx.get_item_by_resolved_name(connection)?;
1139 match connection_item.connection()? {
1140 Connection::SqlServer(connection) => connection,
1141 _ => sql_bail!(
1142 "{} is not a SQL Server connection",
1143 scx.catalog.resolve_full_name(connection_item.name())
1144 ),
1145 };
1146 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1147 let details = details
1148 .as_ref()
1149 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1150 let extras = hex::decode(details)
1151 .map_err(|e| sql_err!("{e}"))
1152 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1153 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1154 Ok(SqlServerSourceConnection {
1155 connection_id: connection_item.id(),
1156 connection: connection_item.id(),
1157 extras,
1158 })
1159}
1160
1161fn plan_postgres_source_connection(
1162 scx: &StatementContext<'_>,
1163 connection: &ResolvedItemName,
1164 options: &Vec<PgConfigOption<Aug>>,
1165) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1166 let connection_item = scx.get_item_by_resolved_name(connection)?;
1167 let PgConfigOptionExtracted {
1168 details,
1169 publication,
1170 text_columns: _,
1174 exclude_columns: _,
1178 seen: _,
1179 } = options.clone().try_into()?;
1180 let details = details
1181 .as_ref()
1182 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1183 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1184 let details =
1185 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1186 let publication_details =
1187 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1188 Ok(PostgresSourceConnection {
1189 connection: connection_item.id(),
1190 connection_id: connection_item.id(),
1191 publication: publication.expect("validated exists during purification"),
1192 publication_details,
1193 })
1194}
1195
1196fn plan_kafka_source_connection(
1197 scx: &StatementContext<'_>,
1198 connection_name: &ResolvedItemName,
1199 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1200 include_metadata: &Vec<SourceIncludeMetadata>,
1201) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1202 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1203 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1204 sql_bail!(
1205 "{} is not a kafka connection",
1206 scx.catalog.resolve_full_name(connection_item.name())
1207 )
1208 }
1209 let KafkaSourceConfigOptionExtracted {
1210 group_id_prefix,
1211 topic,
1212 topic_metadata_refresh_interval,
1213 start_timestamp: _, start_offset,
1215 seen: _,
1216 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1217 let topic = topic.expect("validated exists during purification");
1218 let mut start_offsets = BTreeMap::new();
1219 if let Some(offsets) = start_offset {
1220 for (part, offset) in offsets.iter().enumerate() {
1221 if *offset < 0 {
1222 sql_bail!("START OFFSET must be a nonnegative integer");
1223 }
1224 start_offsets.insert(i32::try_from(part)?, *offset);
1225 }
1226 }
1227 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1228 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1231 }
1232 let metadata_columns = include_metadata
1233 .into_iter()
1234 .flat_map(|item| match item {
1235 SourceIncludeMetadata::Timestamp { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "timestamp".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Timestamp))
1241 }
1242 SourceIncludeMetadata::Partition { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "partition".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Partition))
1248 }
1249 SourceIncludeMetadata::Offset { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "offset".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Offset))
1255 }
1256 SourceIncludeMetadata::Headers { alias } => {
1257 let name = match alias {
1258 Some(name) => name.to_string(),
1259 None => "headers".to_owned(),
1260 };
1261 Some((name, KafkaMetadataKind::Headers))
1262 }
1263 SourceIncludeMetadata::Header {
1264 alias,
1265 key,
1266 use_bytes,
1267 } => Some((
1268 alias.to_string(),
1269 KafkaMetadataKind::Header {
1270 key: key.clone(),
1271 use_bytes: *use_bytes,
1272 },
1273 )),
1274 SourceIncludeMetadata::Key { .. } => {
1275 None
1277 }
1278 })
1279 .collect();
1280 Ok(KafkaSourceConnection {
1281 connection: connection_item.id(),
1282 connection_id: connection_item.id(),
1283 topic,
1284 start_offsets,
1285 group_id_prefix,
1286 topic_metadata_refresh_interval,
1287 metadata_columns,
1288 })
1289}
1290
1291fn apply_source_envelope_encoding(
1292 scx: &StatementContext,
1293 envelope: &ast::SourceEnvelope,
1294 format: &Option<FormatSpecifier<Aug>>,
1295 key_desc: Option<RelationDesc>,
1296 value_desc: RelationDesc,
1297 include_metadata: &[SourceIncludeMetadata],
1298 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1299 source_connection: &GenericSourceConnection<ReferencedConnection>,
1300) -> Result<
1301 (
1302 RelationDesc,
1303 SourceEnvelope,
1304 Option<SourceDataEncoding<ReferencedConnection>>,
1305 ),
1306 PlanError,
1307> {
1308 let encoding = match format {
1309 Some(format) => Some(get_encoding(scx, format, envelope)?),
1310 None => None,
1311 };
1312
1313 let (key_desc, value_desc) = match &encoding {
1314 Some(encoding) => {
1315 match value_desc.typ().columns() {
1318 [typ] => match typ.scalar_type {
1319 SqlScalarType::Bytes => {}
1320 _ => sql_bail!(
1321 "The schema produced by the source is incompatible with format decoding"
1322 ),
1323 },
1324 _ => sql_bail!(
1325 "The schema produced by the source is incompatible with format decoding"
1326 ),
1327 }
1328
1329 let (key_desc, value_desc) = encoding.desc()?;
1330
1331 let key_desc = key_desc.map(|desc| {
1338 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1339 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1340 if is_kafka && is_envelope_none {
1341 RelationDesc::from_names_and_types(
1342 desc.into_iter()
1343 .map(|(name, typ)| (name, typ.nullable(true))),
1344 )
1345 } else {
1346 desc
1347 }
1348 });
1349 (key_desc, value_desc)
1350 }
1351 None => (key_desc, value_desc),
1352 };
1353
1354 let key_envelope_no_encoding = matches!(
1367 source_connection,
1368 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1369 load_generator: LoadGenerator::KeyValue(_),
1370 ..
1371 })
1372 );
1373 let mut key_envelope = get_key_envelope(
1374 include_metadata,
1375 encoding.as_ref(),
1376 key_envelope_no_encoding,
1377 )?;
1378
1379 match (&envelope, &key_envelope) {
1380 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1381 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1382 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1383 ),
1384 _ => {}
1385 };
1386
1387 let envelope = match &envelope {
1396 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1398 ast::SourceEnvelope::Debezium => {
1399 let after_idx = match typecheck_debezium(&value_desc) {
1401 Ok((_before_idx, after_idx)) => Ok(after_idx),
1402 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1403 Some(DataEncoding::Avro(_)) => Err(type_err),
1404 _ => Err(sql_err!(
1405 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1406 )),
1407 },
1408 }?;
1409
1410 UnplannedSourceEnvelope::Upsert {
1411 style: UpsertStyle::Debezium { after_idx },
1412 }
1413 }
1414 ast::SourceEnvelope::Upsert {
1415 value_decode_err_policy,
1416 } => {
1417 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1418 None => {
1419 if !key_envelope_no_encoding {
1420 bail_unsupported!(format!(
1421 "UPSERT requires a key/value format: {:?}",
1422 format
1423 ))
1424 }
1425 None
1426 }
1427 Some(key_encoding) => Some(key_encoding),
1428 };
1429 if key_envelope == KeyEnvelope::None {
1432 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1433 }
1434 let style = match value_decode_err_policy.as_slice() {
1436 [] => UpsertStyle::Default(key_envelope),
1437 [SourceErrorPolicy::Inline { alias }] => {
1438 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1439 UpsertStyle::ValueErrInline {
1440 key_envelope,
1441 error_column: alias
1442 .as_ref()
1443 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1444 }
1445 }
1446 _ => {
1447 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1448 }
1449 };
1450
1451 UnplannedSourceEnvelope::Upsert { style }
1452 }
1453 ast::SourceEnvelope::CdcV2 => {
1454 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1455 match format {
1457 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1458 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1459 }
1460 UnplannedSourceEnvelope::CdcV2
1461 }
1462 };
1463
1464 let metadata_desc = included_column_desc(metadata_columns_desc);
1465 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1466
1467 Ok((desc, envelope, encoding))
1468}
1469
1470fn plan_source_export_desc(
1473 scx: &StatementContext,
1474 name: &UnresolvedItemName,
1475 columns: &Vec<ColumnDef<Aug>>,
1476 constraints: &Vec<TableConstraint<Aug>>,
1477) -> Result<RelationDesc, PlanError> {
1478 let names: Vec<_> = columns
1479 .iter()
1480 .map(|c| normalize::column_name(c.name.clone()))
1481 .collect();
1482
1483 if let Some(dup) = names.iter().duplicates().next() {
1484 sql_bail!("column {} specified more than once", dup.quoted());
1485 }
1486
1487 let mut column_types = Vec::with_capacity(columns.len());
1490 let mut keys = Vec::new();
1491
1492 for (i, c) in columns.into_iter().enumerate() {
1493 let aug_data_type = &c.data_type;
1494 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1495 let mut nullable = true;
1496 for option in &c.options {
1497 match &option.option {
1498 ColumnOption::NotNull => nullable = false,
1499 ColumnOption::Default(_) => {
1500 bail_unsupported!("Source export with default value")
1501 }
1502 ColumnOption::Unique { is_primary } => {
1503 keys.push(vec![i]);
1504 if *is_primary {
1505 nullable = false;
1506 }
1507 }
1508 other => {
1509 bail_unsupported!(format!("Source export with column constraint: {}", other))
1510 }
1511 }
1512 }
1513 column_types.push(ty.nullable(nullable));
1514 }
1515
1516 let mut seen_primary = false;
1517 'c: for constraint in constraints {
1518 match constraint {
1519 TableConstraint::Unique {
1520 name: _,
1521 columns,
1522 is_primary,
1523 nulls_not_distinct,
1524 } => {
1525 if seen_primary && *is_primary {
1526 sql_bail!(
1527 "multiple primary keys for source export {} are not allowed",
1528 name.to_ast_string_stable()
1529 );
1530 }
1531 seen_primary = *is_primary || seen_primary;
1532
1533 let mut key = vec![];
1534 for column in columns {
1535 let column = normalize::column_name(column.clone());
1536 match names.iter().position(|name| *name == column) {
1537 None => sql_bail!("unknown column in constraint: {}", column),
1538 Some(i) => {
1539 let nullable = &mut column_types[i].nullable;
1540 if *is_primary {
1541 if *nulls_not_distinct {
1542 sql_bail!(
1543 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1544 );
1545 }
1546 *nullable = false;
1547 } else if !(*nulls_not_distinct || !*nullable) {
1548 break 'c;
1551 }
1552
1553 key.push(i);
1554 }
1555 }
1556 }
1557
1558 if *is_primary {
1559 keys.insert(0, key);
1560 } else {
1561 keys.push(key);
1562 }
1563 }
1564 TableConstraint::ForeignKey { .. } => {
1565 bail_unsupported!("Source export with a foreign key")
1566 }
1567 TableConstraint::Check { .. } => {
1568 bail_unsupported!("Source export with a check constraint")
1569 }
1570 }
1571 }
1572
1573 let typ = SqlRelationType::new(column_types).with_keys(keys);
1574 let desc = RelationDesc::new(typ, names);
1575 Ok(desc)
1576}
1577
1578generate_extracted_config!(
1579 CreateSubsourceOption,
1580 (Progress, bool, Default(false)),
1581 (ExternalReference, UnresolvedItemName),
1582 (RetainHistory, OptionalDuration),
1583 (TextColumns, Vec::<Ident>, Default(vec![])),
1584 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1585 (Details, String)
1586);
1587
1588pub fn plan_create_subsource(
1589 scx: &StatementContext,
1590 stmt: CreateSubsourceStatement<Aug>,
1591) -> Result<Plan, PlanError> {
1592 let CreateSubsourceStatement {
1593 name,
1594 columns,
1595 of_source,
1596 constraints,
1597 if_not_exists,
1598 with_options,
1599 } = &stmt;
1600
1601 let CreateSubsourceOptionExtracted {
1602 progress,
1603 retain_history,
1604 external_reference,
1605 text_columns,
1606 exclude_columns,
1607 details,
1608 seen: _,
1609 } = with_options.clone().try_into()?;
1610
1611 assert!(
1616 progress ^ (external_reference.is_some() && of_source.is_some()),
1617 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1618 );
1619
1620 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1621
1622 let data_source = if let Some(source_reference) = of_source {
1623 if scx.catalog.system_vars().enable_create_table_from_source()
1626 && scx.catalog.system_vars().force_source_table_syntax()
1627 {
1628 Err(PlanError::UseTablesForSources(
1629 "CREATE SUBSOURCE".to_string(),
1630 ))?;
1631 }
1632
1633 let ingestion_id = *source_reference.item_id();
1636 let external_reference = external_reference.unwrap();
1637
1638 let details = details
1641 .as_ref()
1642 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1643 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1644 let details =
1645 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1646 let details =
1647 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1648 let details = match details {
1649 SourceExportStatementDetails::Postgres { table } => {
1650 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1651 column_casts: crate::pure::postgres::generate_column_casts(
1652 scx,
1653 &table,
1654 &text_columns,
1655 )?,
1656 table,
1657 })
1658 }
1659 SourceExportStatementDetails::MySql {
1660 table,
1661 initial_gtid_set,
1662 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1663 table,
1664 initial_gtid_set,
1665 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1666 exclude_columns: exclude_columns
1667 .into_iter()
1668 .map(|c| c.into_string())
1669 .collect(),
1670 }),
1671 SourceExportStatementDetails::SqlServer {
1672 table,
1673 capture_instance,
1674 initial_lsn,
1675 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1676 capture_instance,
1677 table,
1678 initial_lsn,
1679 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1680 exclude_columns: exclude_columns
1681 .into_iter()
1682 .map(|c| c.into_string())
1683 .collect(),
1684 }),
1685 SourceExportStatementDetails::LoadGenerator { output } => {
1686 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1687 }
1688 SourceExportStatementDetails::Kafka {} => {
1689 bail_unsupported!("subsources cannot reference Kafka sources")
1690 }
1691 };
1692 DataSourceDesc::IngestionExport {
1693 ingestion_id,
1694 external_reference,
1695 details,
1696 data_config: SourceExportDataConfig {
1698 envelope: SourceEnvelope::None(NoneEnvelope {
1699 key_envelope: KeyEnvelope::None,
1700 key_arity: 0,
1701 }),
1702 encoding: None,
1703 },
1704 }
1705 } else if progress {
1706 DataSourceDesc::Progress
1707 } else {
1708 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1709 };
1710
1711 let if_not_exists = *if_not_exists;
1712 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1713
1714 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1715
1716 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1717 let source = Source {
1718 create_sql,
1719 data_source,
1720 desc,
1721 compaction_window,
1722 };
1723
1724 Ok(Plan::CreateSource(CreateSourcePlan {
1725 name,
1726 source,
1727 if_not_exists,
1728 timeline: Timeline::EpochMilliseconds,
1729 in_cluster: None,
1730 }))
1731}
1732
1733generate_extracted_config!(
1734 TableFromSourceOption,
1735 (TextColumns, Vec::<Ident>, Default(vec![])),
1736 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1737 (PartitionBy, Vec<Ident>),
1738 (RetainHistory, OptionalDuration),
1739 (Details, String)
1740);
1741
1742pub fn plan_create_table_from_source(
1743 scx: &StatementContext,
1744 stmt: CreateTableFromSourceStatement<Aug>,
1745) -> Result<Plan, PlanError> {
1746 if !scx.catalog.system_vars().enable_create_table_from_source() {
1747 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1748 }
1749
1750 let CreateTableFromSourceStatement {
1751 name,
1752 columns,
1753 constraints,
1754 if_not_exists,
1755 source,
1756 external_reference,
1757 envelope,
1758 format,
1759 include_metadata,
1760 with_options,
1761 } = &stmt;
1762
1763 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1764
1765 let TableFromSourceOptionExtracted {
1766 text_columns,
1767 exclude_columns,
1768 retain_history,
1769 partition_by,
1770 details,
1771 seen: _,
1772 } = with_options.clone().try_into()?;
1773
1774 let source_item = scx.get_item_by_resolved_name(source)?;
1775 let ingestion_id = source_item.id();
1776
1777 let details = details
1780 .as_ref()
1781 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1782 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1783 let details =
1784 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1785 let details =
1786 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1787
1788 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1789 && include_metadata
1790 .iter()
1791 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1792 {
1793 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1795 }
1796 if !matches!(
1797 details,
1798 SourceExportStatementDetails::Kafka { .. }
1799 | SourceExportStatementDetails::LoadGenerator { .. }
1800 ) && !include_metadata.is_empty()
1801 {
1802 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1803 }
1804
1805 let details = match details {
1806 SourceExportStatementDetails::Postgres { table } => {
1807 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1808 column_casts: crate::pure::postgres::generate_column_casts(
1809 scx,
1810 &table,
1811 &text_columns,
1812 )?,
1813 table,
1814 })
1815 }
1816 SourceExportStatementDetails::MySql {
1817 table,
1818 initial_gtid_set,
1819 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1820 table,
1821 initial_gtid_set,
1822 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1823 exclude_columns: exclude_columns
1824 .into_iter()
1825 .map(|c| c.into_string())
1826 .collect(),
1827 }),
1828 SourceExportStatementDetails::SqlServer {
1829 table,
1830 capture_instance,
1831 initial_lsn,
1832 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1833 table,
1834 capture_instance,
1835 initial_lsn,
1836 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1837 exclude_columns: exclude_columns
1838 .into_iter()
1839 .map(|c| c.into_string())
1840 .collect(),
1841 }),
1842 SourceExportStatementDetails::LoadGenerator { output } => {
1843 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1844 }
1845 SourceExportStatementDetails::Kafka {} => {
1846 if !include_metadata.is_empty()
1847 && !matches!(
1848 envelope,
1849 ast::SourceEnvelope::Upsert { .. }
1850 | ast::SourceEnvelope::None
1851 | ast::SourceEnvelope::Debezium
1852 )
1853 {
1854 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1856 }
1857
1858 let metadata_columns = include_metadata
1859 .into_iter()
1860 .flat_map(|item| match item {
1861 SourceIncludeMetadata::Timestamp { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "timestamp".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Timestamp))
1867 }
1868 SourceIncludeMetadata::Partition { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "partition".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Partition))
1874 }
1875 SourceIncludeMetadata::Offset { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "offset".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Offset))
1881 }
1882 SourceIncludeMetadata::Headers { alias } => {
1883 let name = match alias {
1884 Some(name) => name.to_string(),
1885 None => "headers".to_owned(),
1886 };
1887 Some((name, KafkaMetadataKind::Headers))
1888 }
1889 SourceIncludeMetadata::Header {
1890 alias,
1891 key,
1892 use_bytes,
1893 } => Some((
1894 alias.to_string(),
1895 KafkaMetadataKind::Header {
1896 key: key.clone(),
1897 use_bytes: *use_bytes,
1898 },
1899 )),
1900 SourceIncludeMetadata::Key { .. } => {
1901 None
1903 }
1904 })
1905 .collect();
1906
1907 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1908 }
1909 };
1910
1911 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1912
1913 let (key_desc, value_desc) =
1918 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1919 let columns = match columns {
1920 TableFromSourceColumns::Defined(columns) => columns,
1921 _ => unreachable!(),
1922 };
1923 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1924 (None, desc)
1925 } else {
1926 let key_desc = source_connection.default_key_desc();
1927 let value_desc = source_connection.default_value_desc();
1928 (Some(key_desc), value_desc)
1929 };
1930
1931 let metadata_columns_desc = match &details {
1932 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1933 metadata_columns, ..
1934 }) => kafka_metadata_columns_desc(metadata_columns),
1935 _ => vec![],
1936 };
1937
1938 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1939 scx,
1940 &envelope,
1941 format,
1942 key_desc,
1943 value_desc,
1944 include_metadata,
1945 metadata_columns_desc,
1946 source_connection,
1947 )?;
1948 if let TableFromSourceColumns::Named(col_names) = columns {
1949 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1950 }
1951
1952 let names: Vec<_> = desc.iter_names().cloned().collect();
1953 if let Some(dup) = names.iter().duplicates().next() {
1954 sql_bail!("column {} specified more than once", dup.quoted());
1955 }
1956
1957 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1958
1959 let timeline = match envelope {
1962 SourceEnvelope::CdcV2 => {
1963 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1964 }
1965 _ => Timeline::EpochMilliseconds,
1966 };
1967
1968 if let Some(partition_by) = partition_by {
1969 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1970 check_partition_by(&desc, partition_by)?;
1971 }
1972
1973 let data_source = DataSourceDesc::IngestionExport {
1974 ingestion_id,
1975 external_reference: external_reference
1976 .as_ref()
1977 .expect("populated in purification")
1978 .clone(),
1979 details,
1980 data_config: SourceExportDataConfig { envelope, encoding },
1981 };
1982
1983 let if_not_exists = *if_not_exists;
1984
1985 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1986
1987 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1988 let table = Table {
1989 create_sql,
1990 desc: VersionedRelationDesc::new(desc),
1991 temporary: false,
1992 compaction_window,
1993 data_source: TableDataSource::DataSource {
1994 desc: data_source,
1995 timeline,
1996 },
1997 };
1998
1999 Ok(Plan::CreateTable(CreateTablePlan {
2000 name,
2001 table,
2002 if_not_exists,
2003 }))
2004}
2005
2006generate_extracted_config!(
2007 LoadGeneratorOption,
2008 (TickInterval, Duration),
2009 (AsOf, u64, Default(0_u64)),
2010 (UpTo, u64, Default(u64::MAX)),
2011 (ScaleFactor, f64),
2012 (MaxCardinality, u64),
2013 (Keys, u64),
2014 (SnapshotRounds, u64),
2015 (TransactionalSnapshot, bool),
2016 (ValueSize, u64),
2017 (Seed, u64),
2018 (Partitions, u64),
2019 (BatchSize, u64)
2020);
2021
2022impl LoadGeneratorOptionExtracted {
2023 pub(super) fn ensure_only_valid_options(
2024 &self,
2025 loadgen: &ast::LoadGenerator,
2026 ) -> Result<(), PlanError> {
2027 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2028
2029 let mut options = self.seen.clone();
2030
2031 let permitted_options: &[_] = match loadgen {
2032 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2033 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2034 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2035 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2036 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2037 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2038 ast::LoadGenerator::KeyValue => &[
2039 TickInterval,
2040 Keys,
2041 SnapshotRounds,
2042 TransactionalSnapshot,
2043 ValueSize,
2044 Seed,
2045 Partitions,
2046 BatchSize,
2047 ],
2048 };
2049
2050 for o in permitted_options {
2051 options.remove(o);
2052 }
2053
2054 if !options.is_empty() {
2055 sql_bail!(
2056 "{} load generators do not support {} values",
2057 loadgen,
2058 options.iter().join(", ")
2059 )
2060 }
2061
2062 Ok(())
2063 }
2064}
2065
2066pub(crate) fn load_generator_ast_to_generator(
2067 scx: &StatementContext,
2068 loadgen: &ast::LoadGenerator,
2069 options: &[LoadGeneratorOption<Aug>],
2070 include_metadata: &[SourceIncludeMetadata],
2071) -> Result<LoadGenerator, PlanError> {
2072 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2073 extracted.ensure_only_valid_options(loadgen)?;
2074
2075 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2076 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2077 }
2078
2079 let load_generator = match loadgen {
2080 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2081 ast::LoadGenerator::Clock => {
2082 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2083 LoadGenerator::Clock
2084 }
2085 ast::LoadGenerator::Counter => {
2086 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2087 let LoadGeneratorOptionExtracted {
2088 max_cardinality, ..
2089 } = extracted;
2090 LoadGenerator::Counter { max_cardinality }
2091 }
2092 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2093 ast::LoadGenerator::Datums => {
2094 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2095 LoadGenerator::Datums
2096 }
2097 ast::LoadGenerator::Tpch => {
2098 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2099
2100 let sf: f64 = scale_factor.unwrap_or(0.01);
2102 if !sf.is_finite() || sf < 0.0 {
2103 sql_bail!("unsupported scale factor {sf}");
2104 }
2105
2106 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2107 let total = (sf * multiplier).floor();
2108 let mut i = i64::try_cast_from(total)
2109 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2110 if i < 1 {
2111 i = 1;
2112 }
2113 Ok(i)
2114 };
2115
2116 let count_supplier = f_to_i(10_000f64)?;
2119 let count_part = f_to_i(200_000f64)?;
2120 let count_customer = f_to_i(150_000f64)?;
2121 let count_orders = f_to_i(150_000f64 * 10f64)?;
2122 let count_clerk = f_to_i(1_000f64)?;
2123
2124 LoadGenerator::Tpch {
2125 count_supplier,
2126 count_part,
2127 count_customer,
2128 count_orders,
2129 count_clerk,
2130 }
2131 }
2132 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2133 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2134 let LoadGeneratorOptionExtracted {
2135 keys,
2136 snapshot_rounds,
2137 transactional_snapshot,
2138 value_size,
2139 tick_interval,
2140 seed,
2141 partitions,
2142 batch_size,
2143 ..
2144 } = extracted;
2145
2146 let mut include_offset = None;
2147 for im in include_metadata {
2148 match im {
2149 SourceIncludeMetadata::Offset { alias } => {
2150 include_offset = match alias {
2151 Some(alias) => Some(alias.to_string()),
2152 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2153 }
2154 }
2155 SourceIncludeMetadata::Key { .. } => continue,
2156
2157 _ => {
2158 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2159 }
2160 };
2161 }
2162
2163 let lgkv = KeyValueLoadGenerator {
2164 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2165 snapshot_rounds: snapshot_rounds
2166 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2167 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2169 value_size: value_size
2170 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2171 partitions: partitions
2172 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2173 tick_interval,
2174 batch_size: batch_size
2175 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2176 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2177 include_offset,
2178 };
2179
2180 if lgkv.keys == 0
2181 || lgkv.partitions == 0
2182 || lgkv.value_size == 0
2183 || lgkv.batch_size == 0
2184 {
2185 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2186 }
2187
2188 if lgkv.keys % lgkv.partitions != 0 {
2189 sql_bail!("KEYS must be a multiple of PARTITIONS")
2190 }
2191
2192 if lgkv.batch_size > lgkv.keys {
2193 sql_bail!("KEYS must be larger than BATCH SIZE")
2194 }
2195
2196 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2199 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2200 }
2201
2202 if lgkv.snapshot_rounds == 0 {
2203 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2204 }
2205
2206 LoadGenerator::KeyValue(lgkv)
2207 }
2208 };
2209
2210 Ok(load_generator)
2211}
2212
2213fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2214 let before = value_desc.get_by_name(&"before".into());
2215 let (after_idx, after_ty) = value_desc
2216 .get_by_name(&"after".into())
2217 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2218 let before_idx = if let Some((before_idx, before_ty)) = before {
2219 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2220 sql_bail!("'before' column must be of type record");
2221 }
2222 if before_ty != after_ty {
2223 sql_bail!("'before' type differs from 'after' column");
2224 }
2225 Some(before_idx)
2226 } else {
2227 None
2228 };
2229 Ok((before_idx, after_idx))
2230}
2231
2232fn get_encoding(
2233 scx: &StatementContext,
2234 format: &FormatSpecifier<Aug>,
2235 envelope: &ast::SourceEnvelope,
2236) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2237 let encoding = match format {
2238 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2239 FormatSpecifier::KeyValue { key, value } => {
2240 let key = {
2241 let encoding = get_encoding_inner(scx, key)?;
2242 Some(encoding.key.unwrap_or(encoding.value))
2243 };
2244 let value = get_encoding_inner(scx, value)?.value;
2245 SourceDataEncoding { key, value }
2246 }
2247 };
2248
2249 let requires_keyvalue = matches!(
2250 envelope,
2251 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2252 );
2253 let is_keyvalue = encoding.key.is_some();
2254 if requires_keyvalue && !is_keyvalue {
2255 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2256 };
2257
2258 Ok(encoding)
2259}
2260
2261fn source_sink_cluster_config<'a, 'ctx>(
2267 scx: &'a StatementContext<'ctx>,
2268 in_cluster: &mut Option<ResolvedClusterName>,
2269) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2270 let cluster = match in_cluster {
2271 None => {
2272 let cluster = scx.catalog.resolve_cluster(None)?;
2273 *in_cluster = Some(ResolvedClusterName {
2274 id: cluster.id(),
2275 print_name: None,
2276 });
2277 cluster
2278 }
2279 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2280 };
2281
2282 Ok(cluster)
2283}
2284
2285generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2286
2287#[derive(Debug)]
2288pub struct Schema {
2289 pub key_schema: Option<String>,
2290 pub value_schema: String,
2291 pub key_reference_schemas: Vec<String>,
2293 pub value_reference_schemas: Vec<String>,
2295 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2296 pub confluent_wire_format: bool,
2297}
2298
2299fn get_encoding_inner(
2300 scx: &StatementContext,
2301 format: &Format<Aug>,
2302) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2303 let value = match format {
2304 Format::Bytes => DataEncoding::Bytes,
2305 Format::Avro(schema) => {
2306 let Schema {
2307 key_schema,
2308 value_schema,
2309 key_reference_schemas,
2310 value_reference_schemas,
2311 csr_connection,
2312 confluent_wire_format,
2313 } = match schema {
2314 AvroSchema::InlineSchema {
2317 schema: ast::Schema { schema },
2318 with_options,
2319 } => {
2320 let AvroSchemaOptionExtracted {
2321 confluent_wire_format,
2322 ..
2323 } = with_options.clone().try_into()?;
2324
2325 Schema {
2326 key_schema: None,
2327 value_schema: schema.clone(),
2328 key_reference_schemas: vec![],
2329 value_reference_schemas: vec![],
2330 csr_connection: None,
2331 confluent_wire_format,
2332 }
2333 }
2334 AvroSchema::Csr {
2335 csr_connection:
2336 CsrConnectionAvro {
2337 connection,
2338 seed,
2339 key_strategy: _,
2340 value_strategy: _,
2341 },
2342 } => {
2343 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2344 let csr_connection = match item.connection()? {
2345 Connection::Csr(_) => item.id(),
2346 _ => {
2347 sql_bail!(
2348 "{} is not a schema registry connection",
2349 scx.catalog
2350 .resolve_full_name(item.name())
2351 .to_string()
2352 .quoted()
2353 )
2354 }
2355 };
2356
2357 if let Some(seed) = seed {
2358 Schema {
2359 key_schema: seed.key_schema.clone(),
2360 value_schema: seed.value_schema.clone(),
2361 key_reference_schemas: seed.key_reference_schemas.clone(),
2362 value_reference_schemas: seed.value_reference_schemas.clone(),
2363 csr_connection: Some(csr_connection),
2364 confluent_wire_format: true,
2365 }
2366 } else {
2367 unreachable!("CSR seed resolution should already have been called: Avro")
2368 }
2369 }
2370 };
2371
2372 if let Some(key_schema) = key_schema {
2373 return Ok(SourceDataEncoding {
2374 key: Some(DataEncoding::Avro(AvroEncoding {
2375 schema: key_schema,
2376 reference_schemas: key_reference_schemas,
2377 csr_connection: csr_connection.clone(),
2378 confluent_wire_format,
2379 })),
2380 value: DataEncoding::Avro(AvroEncoding {
2381 schema: value_schema,
2382 reference_schemas: value_reference_schemas,
2383 csr_connection,
2384 confluent_wire_format,
2385 }),
2386 });
2387 } else {
2388 DataEncoding::Avro(AvroEncoding {
2389 schema: value_schema,
2390 reference_schemas: value_reference_schemas,
2391 csr_connection,
2392 confluent_wire_format,
2393 })
2394 }
2395 }
2396 Format::Protobuf(schema) => match schema {
2397 ProtobufSchema::Csr {
2398 csr_connection:
2399 CsrConnectionProtobuf {
2400 connection:
2401 CsrConnection {
2402 connection,
2403 options,
2404 },
2405 seed,
2406 },
2407 } => {
2408 if let Some(CsrSeedProtobuf { key, value }) = seed {
2409 let item = scx.get_item_by_resolved_name(connection)?;
2410 let _ = match item.connection()? {
2411 Connection::Csr(connection) => connection,
2412 _ => {
2413 sql_bail!(
2414 "{} is not a schema registry connection",
2415 scx.catalog
2416 .resolve_full_name(item.name())
2417 .to_string()
2418 .quoted()
2419 )
2420 }
2421 };
2422
2423 if !options.is_empty() {
2424 sql_bail!("Protobuf CSR connections do not support any options");
2425 }
2426
2427 let value = DataEncoding::Protobuf(ProtobufEncoding {
2428 descriptors: strconv::parse_bytes(&value.schema)?,
2429 message_name: value.message_name.clone(),
2430 confluent_wire_format: true,
2431 });
2432 if let Some(key) = key {
2433 return Ok(SourceDataEncoding {
2434 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2435 descriptors: strconv::parse_bytes(&key.schema)?,
2436 message_name: key.message_name.clone(),
2437 confluent_wire_format: true,
2438 })),
2439 value,
2440 });
2441 }
2442 value
2443 } else {
2444 unreachable!("CSR seed resolution should already have been called: Proto")
2445 }
2446 }
2447 ProtobufSchema::InlineSchema {
2448 message_name,
2449 schema: ast::Schema { schema },
2450 } => {
2451 let descriptors = strconv::parse_bytes(schema)?;
2452
2453 DataEncoding::Protobuf(ProtobufEncoding {
2454 descriptors,
2455 message_name: message_name.to_owned(),
2456 confluent_wire_format: false,
2457 })
2458 }
2459 },
2460 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2461 regex: mz_repr::adt::regex::Regex::new(regex, false)
2462 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2463 }),
2464 Format::Csv { columns, delimiter } => {
2465 let columns = match columns {
2466 CsvColumns::Header { names } => {
2467 if names.is_empty() {
2468 sql_bail!("[internal error] column spec should get names in purify")
2469 }
2470 ColumnSpec::Header {
2471 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2472 }
2473 }
2474 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2475 };
2476 DataEncoding::Csv(CsvEncoding {
2477 columns,
2478 delimiter: u8::try_from(*delimiter)
2479 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2480 })
2481 }
2482 Format::Json { array: false } => DataEncoding::Json,
2483 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2484 Format::Text => DataEncoding::Text,
2485 };
2486 Ok(SourceDataEncoding { key: None, value })
2487}
2488
2489fn get_key_envelope(
2491 included_items: &[SourceIncludeMetadata],
2492 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2493 key_envelope_no_encoding: bool,
2494) -> Result<KeyEnvelope, PlanError> {
2495 let key_definition = included_items
2496 .iter()
2497 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2498 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2499 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2500 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2501 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2502 (Some(name), _) if key_envelope_no_encoding => {
2503 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2504 }
2505 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2506 (_, None) => {
2507 sql_bail!(
2511 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2512 got bare FORMAT"
2513 );
2514 }
2515 }
2516 } else {
2517 Ok(KeyEnvelope::None)
2518 }
2519}
2520
2521fn get_unnamed_key_envelope(
2524 key: Option<&DataEncoding<ReferencedConnection>>,
2525) -> Result<KeyEnvelope, PlanError> {
2526 let is_composite = match key {
2530 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2531 Some(
2532 DataEncoding::Avro(_)
2533 | DataEncoding::Csv(_)
2534 | DataEncoding::Protobuf(_)
2535 | DataEncoding::Regex { .. },
2536 ) => true,
2537 None => false,
2538 };
2539
2540 if is_composite {
2541 Ok(KeyEnvelope::Flattened)
2542 } else {
2543 Ok(KeyEnvelope::Named("key".to_string()))
2544 }
2545}
2546
2547pub fn describe_create_view(
2548 _: &StatementContext,
2549 _: CreateViewStatement<Aug>,
2550) -> Result<StatementDesc, PlanError> {
2551 Ok(StatementDesc::new(None))
2552}
2553
2554pub fn plan_view(
2555 scx: &StatementContext,
2556 def: &mut ViewDefinition<Aug>,
2557 temporary: bool,
2558) -> Result<(QualifiedItemName, View), PlanError> {
2559 let create_sql = normalize::create_statement(
2560 scx,
2561 Statement::CreateView(CreateViewStatement {
2562 if_exists: IfExistsBehavior::Error,
2563 temporary,
2564 definition: def.clone(),
2565 }),
2566 )?;
2567
2568 let ViewDefinition {
2569 name,
2570 columns,
2571 query,
2572 } = def;
2573
2574 let query::PlannedRootQuery {
2575 expr,
2576 mut desc,
2577 finishing,
2578 scope: _,
2579 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2580 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2586 &finishing,
2587 expr.arity()
2588 ));
2589 if expr.contains_parameters()? {
2590 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2591 }
2592
2593 let dependencies = expr
2594 .depends_on()
2595 .into_iter()
2596 .map(|gid| scx.catalog.resolve_item_id(&gid))
2597 .collect();
2598
2599 let name = if temporary {
2600 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2601 } else {
2602 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2603 };
2604
2605 plan_utils::maybe_rename_columns(
2606 format!("view {}", scx.catalog.resolve_full_name(&name)),
2607 &mut desc,
2608 columns,
2609 )?;
2610 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2611
2612 if let Some(dup) = names.iter().duplicates().next() {
2613 sql_bail!("column {} specified more than once", dup.quoted());
2614 }
2615
2616 let view = View {
2617 create_sql,
2618 expr,
2619 dependencies,
2620 column_names: names,
2621 temporary,
2622 };
2623
2624 Ok((name, view))
2625}
2626
2627pub fn plan_create_view(
2628 scx: &StatementContext,
2629 mut stmt: CreateViewStatement<Aug>,
2630) -> Result<Plan, PlanError> {
2631 let CreateViewStatement {
2632 temporary,
2633 if_exists,
2634 definition,
2635 } = &mut stmt;
2636 let (name, view) = plan_view(scx, definition, *temporary)?;
2637
2638 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2641
2642 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2643 let if_exists = true;
2644 let cascade = false;
2645 let maybe_item_to_drop = plan_drop_item(
2646 scx,
2647 ObjectType::View,
2648 if_exists,
2649 definition.name.clone(),
2650 cascade,
2651 )?;
2652
2653 if let Some(id) = maybe_item_to_drop {
2655 let dependencies = view.expr.depends_on();
2656 let invalid_drop = scx
2657 .get_item(&id)
2658 .global_ids()
2659 .any(|gid| dependencies.contains(&gid));
2660 if invalid_drop {
2661 let item = scx.catalog.get_item(&id);
2662 sql_bail!(
2663 "cannot replace view {0}: depended upon by new {0} definition",
2664 scx.catalog.resolve_full_name(item.name())
2665 );
2666 }
2667
2668 Some(id)
2669 } else {
2670 None
2671 }
2672 } else {
2673 None
2674 };
2675 let drop_ids = replace
2676 .map(|id| {
2677 scx.catalog
2678 .item_dependents(id)
2679 .into_iter()
2680 .map(|id| id.unwrap_item_id())
2681 .collect()
2682 })
2683 .unwrap_or_default();
2684
2685 validate_view_dependencies(scx, &view.dependencies.0)?;
2686
2687 let full_name = scx.catalog.resolve_full_name(&name);
2689 let partial_name = PartialItemName::from(full_name.clone());
2690 if let (Ok(item), IfExistsBehavior::Error, false) = (
2693 scx.catalog.resolve_item_or_type(&partial_name),
2694 *if_exists,
2695 ignore_if_exists_errors,
2696 ) {
2697 return Err(PlanError::ItemAlreadyExists {
2698 name: full_name.to_string(),
2699 item_type: item.item_type(),
2700 });
2701 }
2702
2703 Ok(Plan::CreateView(CreateViewPlan {
2704 name,
2705 view,
2706 replace,
2707 drop_ids,
2708 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2709 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2710 }))
2711}
2712
2713fn validate_view_dependencies(
2715 scx: &StatementContext,
2716 dependencies: &BTreeSet<CatalogItemId>,
2717) -> Result<(), PlanError> {
2718 for id in dependencies {
2719 let item = scx.catalog.get_item(id);
2720 if item.replacement_target().is_some() {
2721 let name = scx.catalog.minimal_qualification(item.name());
2722 return Err(PlanError::InvalidDependency {
2723 name: name.to_string(),
2724 item_type: format!("replacement {}", item.item_type()),
2725 });
2726 }
2727 }
2728
2729 Ok(())
2730}
2731
2732pub fn describe_create_materialized_view(
2733 _: &StatementContext,
2734 _: CreateMaterializedViewStatement<Aug>,
2735) -> Result<StatementDesc, PlanError> {
2736 Ok(StatementDesc::new(None))
2737}
2738
2739pub fn describe_create_continual_task(
2740 _: &StatementContext,
2741 _: CreateContinualTaskStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743 Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_create_network_policy(
2747 _: &StatementContext,
2748 _: CreateNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750 Ok(StatementDesc::new(None))
2751}
2752
2753pub fn describe_alter_network_policy(
2754 _: &StatementContext,
2755 _: AlterNetworkPolicyStatement<Aug>,
2756) -> Result<StatementDesc, PlanError> {
2757 Ok(StatementDesc::new(None))
2758}
2759
2760pub fn plan_create_materialized_view(
2761 scx: &StatementContext,
2762 mut stmt: CreateMaterializedViewStatement<Aug>,
2763) -> Result<Plan, PlanError> {
2764 let cluster_id =
2765 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2766 stmt.in_cluster = Some(ResolvedClusterName {
2767 id: cluster_id,
2768 print_name: None,
2769 });
2770
2771 let target_replica = match &stmt.in_cluster_replica {
2772 Some(replica_name) => {
2773 scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2774
2775 let cluster = scx.catalog.get_cluster(cluster_id);
2776 let replica_id = cluster
2777 .replica_ids()
2778 .get(replica_name.as_str())
2779 .copied()
2780 .ok_or_else(|| {
2781 CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2782 })?;
2783 Some(replica_id)
2784 }
2785 None => None,
2786 };
2787
2788 let create_sql =
2789 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2790
2791 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2792 let name = scx.allocate_qualified_name(partial_name.clone())?;
2793
2794 let query::PlannedRootQuery {
2795 expr,
2796 mut desc,
2797 finishing,
2798 scope: _,
2799 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2800 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2802 &finishing,
2803 expr.arity()
2804 ));
2805 if expr.contains_parameters()? {
2806 return Err(PlanError::ParameterNotAllowed(
2807 "materialized views".to_string(),
2808 ));
2809 }
2810
2811 plan_utils::maybe_rename_columns(
2812 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2813 &mut desc,
2814 &stmt.columns,
2815 )?;
2816 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2817
2818 let MaterializedViewOptionExtracted {
2819 assert_not_null,
2820 partition_by,
2821 retain_history,
2822 refresh,
2823 seen: _,
2824 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2825
2826 if let Some(partition_by) = partition_by {
2827 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2828 check_partition_by(&desc, partition_by)?;
2829 }
2830
2831 let refresh_schedule = {
2832 let mut refresh_schedule = RefreshSchedule::default();
2833 let mut on_commits_seen = 0;
2834 for refresh_option_value in refresh {
2835 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2836 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2837 }
2838 match refresh_option_value {
2839 RefreshOptionValue::OnCommit => {
2840 on_commits_seen += 1;
2841 }
2842 RefreshOptionValue::AtCreation => {
2843 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2844 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2845 }
2846 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2847 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2849 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2850 name: "REFRESH AT",
2851 scope: &Scope::empty(),
2852 relation_type: &SqlRelationType::empty(),
2853 allow_aggregates: false,
2854 allow_subqueries: false,
2855 allow_parameters: false,
2856 allow_windows: false,
2857 };
2858 let hir = plan_expr(ecx, &time)?.cast_to(
2859 ecx,
2860 CastContext::Assignment,
2861 &SqlScalarType::MzTimestamp,
2862 )?;
2863 let timestamp = hir
2865 .into_literal_mz_timestamp()
2866 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2867 refresh_schedule.ats.push(timestamp);
2868 }
2869 RefreshOptionValue::Every(RefreshEveryOptionValue {
2870 interval,
2871 aligned_to,
2872 }) => {
2873 let interval = Interval::try_from_value(Value::Interval(interval))?;
2874 if interval.as_microseconds() <= 0 {
2875 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2876 }
2877 if interval.months != 0 {
2878 sql_bail!("REFRESH interval must not involve units larger than days");
2883 }
2884 let interval = interval.duration()?;
2885 if u64::try_from(interval.as_millis()).is_err() {
2886 sql_bail!("REFRESH interval too large");
2887 }
2888
2889 let mut aligned_to = match aligned_to {
2890 Some(aligned_to) => aligned_to,
2891 None => {
2892 soft_panic_or_log!(
2893 "ALIGNED TO should have been filled in by purification"
2894 );
2895 sql_bail!(
2896 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2897 )
2898 }
2899 };
2900
2901 transform_ast::transform(scx, &mut aligned_to)?;
2903
2904 let ecx = &ExprContext {
2905 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2906 name: "REFRESH EVERY ... ALIGNED TO",
2907 scope: &Scope::empty(),
2908 relation_type: &SqlRelationType::empty(),
2909 allow_aggregates: false,
2910 allow_subqueries: false,
2911 allow_parameters: false,
2912 allow_windows: false,
2913 };
2914 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2915 ecx,
2916 CastContext::Assignment,
2917 &SqlScalarType::MzTimestamp,
2918 )?;
2919 let aligned_to_const = aligned_to_hir
2921 .into_literal_mz_timestamp()
2922 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2923
2924 refresh_schedule.everies.push(RefreshEvery {
2925 interval,
2926 aligned_to: aligned_to_const,
2927 });
2928 }
2929 }
2930 }
2931
2932 if on_commits_seen > 1 {
2933 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2934 }
2935 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2936 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2937 }
2938
2939 if refresh_schedule == RefreshSchedule::default() {
2940 None
2941 } else {
2942 Some(refresh_schedule)
2943 }
2944 };
2945
2946 let as_of = stmt.as_of.map(Timestamp::from);
2947 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2948 let mut non_null_assertions = assert_not_null
2949 .into_iter()
2950 .map(normalize::column_name)
2951 .map(|assertion_name| {
2952 column_names
2953 .iter()
2954 .position(|col| col == &assertion_name)
2955 .ok_or_else(|| {
2956 sql_err!(
2957 "column {} in ASSERT NOT NULL option not found",
2958 assertion_name.quoted()
2959 )
2960 })
2961 })
2962 .collect::<Result<Vec<_>, _>>()?;
2963 non_null_assertions.sort();
2964 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2965 let dup = &column_names[*dup];
2966 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2967 }
2968
2969 if let Some(dup) = column_names.iter().duplicates().next() {
2970 sql_bail!("column {} specified more than once", dup.quoted());
2971 }
2972
2973 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2976 Ok(true) => IfExistsBehavior::Skip,
2977 _ => stmt.if_exists,
2978 };
2979
2980 let mut replace = None;
2981 let mut if_not_exists = false;
2982 match if_exists {
2983 IfExistsBehavior::Replace => {
2984 let if_exists = true;
2985 let cascade = false;
2986 let replace_id = plan_drop_item(
2987 scx,
2988 ObjectType::MaterializedView,
2989 if_exists,
2990 partial_name.clone().into(),
2991 cascade,
2992 )?;
2993
2994 if let Some(id) = replace_id {
2996 let dependencies = expr.depends_on();
2997 let invalid_drop = scx
2998 .get_item(&id)
2999 .global_ids()
3000 .any(|gid| dependencies.contains(&gid));
3001 if invalid_drop {
3002 let item = scx.catalog.get_item(&id);
3003 sql_bail!(
3004 "cannot replace materialized view {0}: depended upon by new {0} definition",
3005 scx.catalog.resolve_full_name(item.name())
3006 );
3007 }
3008 replace = Some(id);
3009 }
3010 }
3011 IfExistsBehavior::Skip => if_not_exists = true,
3012 IfExistsBehavior::Error => (),
3013 }
3014 let drop_ids = replace
3015 .map(|id| {
3016 scx.catalog
3017 .item_dependents(id)
3018 .into_iter()
3019 .map(|id| id.unwrap_item_id())
3020 .collect()
3021 })
3022 .unwrap_or_default();
3023 let mut dependencies: BTreeSet<_> = expr
3024 .depends_on()
3025 .into_iter()
3026 .map(|gid| scx.catalog.resolve_item_id(&gid))
3027 .collect();
3028
3029 let mut replacement_target = None;
3031 if let Some(target_name) = &stmt.replacement_for {
3032 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3033
3034 let target = scx.get_item_by_resolved_name(target_name)?;
3035 if target.item_type() != CatalogItemType::MaterializedView {
3036 return Err(PlanError::InvalidReplacement {
3037 item_type: target.item_type(),
3038 item_name: scx.catalog.minimal_qualification(target.name()),
3039 replacement_type: CatalogItemType::MaterializedView,
3040 replacement_name: partial_name,
3041 });
3042 }
3043 if target.id().is_system() {
3044 sql_bail!(
3045 "cannot replace {} because it is required by the database system",
3046 scx.catalog.minimal_qualification(target.name()),
3047 );
3048 }
3049
3050 for dependent in scx.catalog.item_dependents(target.id()) {
3052 if let ObjectId::Item(id) = dependent
3053 && dependencies.contains(&id)
3054 {
3055 sql_bail!(
3056 "replacement would cause {} to depend on itself",
3057 scx.catalog.minimal_qualification(target.name()),
3058 );
3059 }
3060 }
3061
3062 dependencies.insert(target.id());
3063
3064 for use_id in target.used_by() {
3065 let use_item = scx.get_item(use_id);
3066 if use_item.replacement_target() == Some(target.id()) {
3067 sql_bail!(
3068 "cannot replace {} because it already has a replacement: {}",
3069 scx.catalog.minimal_qualification(target.name()),
3070 scx.catalog.minimal_qualification(use_item.name()),
3071 );
3072 }
3073 }
3074
3075 replacement_target = Some(target.id());
3076 }
3077
3078 validate_view_dependencies(scx, &dependencies)?;
3079
3080 let full_name = scx.catalog.resolve_full_name(&name);
3082 let partial_name = PartialItemName::from(full_name.clone());
3083 if let (IfExistsBehavior::Error, Ok(item)) =
3086 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3087 {
3088 return Err(PlanError::ItemAlreadyExists {
3089 name: full_name.to_string(),
3090 item_type: item.item_type(),
3091 });
3092 }
3093
3094 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3095 name,
3096 materialized_view: MaterializedView {
3097 create_sql,
3098 expr,
3099 dependencies: DependencyIds(dependencies),
3100 column_names,
3101 replacement_target,
3102 cluster_id,
3103 target_replica,
3104 non_null_assertions,
3105 compaction_window,
3106 refresh_schedule,
3107 as_of,
3108 },
3109 replace,
3110 drop_ids,
3111 if_not_exists,
3112 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3113 }))
3114}
3115
3116generate_extracted_config!(
3117 MaterializedViewOption,
3118 (AssertNotNull, Ident, AllowMultiple),
3119 (PartitionBy, Vec<Ident>),
3120 (RetainHistory, OptionalDuration),
3121 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3122);
3123
3124pub fn plan_create_continual_task(
3125 scx: &StatementContext,
3126 mut stmt: CreateContinualTaskStatement<Aug>,
3127) -> Result<Plan, PlanError> {
3128 match &stmt.sugar {
3129 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3130 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3131 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3132 }
3133 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3134 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3135 }
3136 };
3137 let cluster_id = match &stmt.in_cluster {
3138 None => scx.catalog.resolve_cluster(None)?.id(),
3139 Some(in_cluster) => in_cluster.id,
3140 };
3141 stmt.in_cluster = Some(ResolvedClusterName {
3142 id: cluster_id,
3143 print_name: None,
3144 });
3145
3146 let create_sql =
3147 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3148
3149 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3150
3151 let mut desc = match stmt.columns {
3156 None => None,
3157 Some(columns) => {
3158 let mut desc_columns = Vec::with_capacity(columns.capacity());
3159 for col in columns.iter() {
3160 desc_columns.push((
3161 normalize::column_name(col.name.clone()),
3162 SqlColumnType {
3163 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3164 nullable: false,
3165 },
3166 ));
3167 }
3168 Some(RelationDesc::from_names_and_types(desc_columns))
3169 }
3170 };
3171 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3172 match input.item_type() {
3173 CatalogItemType::ContinualTask
3176 | CatalogItemType::Table
3177 | CatalogItemType::MaterializedView
3178 | CatalogItemType::Source => {}
3179 CatalogItemType::Sink
3180 | CatalogItemType::View
3181 | CatalogItemType::Index
3182 | CatalogItemType::Type
3183 | CatalogItemType::Func
3184 | CatalogItemType::Secret
3185 | CatalogItemType::Connection => {
3186 sql_bail!(
3187 "CONTINUAL TASK cannot use {} as an input",
3188 input.item_type()
3189 );
3190 }
3191 }
3192
3193 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3194 let ct_name = stmt.name;
3195 let placeholder_id = match &ct_name {
3196 ResolvedItemName::ContinualTask { id, name } => {
3197 let desc = match desc.as_ref().cloned() {
3198 Some(x) => x,
3199 None => {
3200 let desc = input.relation_desc().expect("item type checked above");
3205 desc.into_owned()
3206 }
3207 };
3208 qcx.ctes.insert(
3209 *id,
3210 CteDesc {
3211 name: name.item.clone(),
3212 desc,
3213 },
3214 );
3215 Some(*id)
3216 }
3217 _ => None,
3218 };
3219
3220 let mut exprs = Vec::new();
3221 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3222 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3223 let query::PlannedRootQuery {
3224 expr,
3225 desc: desc_query,
3226 finishing,
3227 scope: _,
3228 } = query::plan_ct_query(&mut qcx, query)?;
3229 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3232 &finishing,
3233 expr.arity()
3234 ));
3235 if expr.contains_parameters()? {
3236 if expr.contains_parameters()? {
3237 return Err(PlanError::ParameterNotAllowed(
3238 "continual tasks".to_string(),
3239 ));
3240 }
3241 }
3242 let expr = match desc.as_mut() {
3243 None => {
3244 desc = Some(desc_query);
3245 expr
3246 }
3247 Some(desc) => {
3248 if desc_query.arity() > desc.arity() {
3251 sql_bail!(
3252 "statement {}: INSERT has more expressions than target columns",
3253 idx
3254 );
3255 }
3256 if desc_query.arity() < desc.arity() {
3257 sql_bail!(
3258 "statement {}: INSERT has more target columns than expressions",
3259 idx
3260 );
3261 }
3262 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3265 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3266 let expr = expr.map_err(|e| {
3267 sql_err!(
3268 "statement {}: column {} is of type {} but expression is of type {}",
3269 idx,
3270 desc.get_name(e.column).quoted(),
3271 qcx.humanize_sql_scalar_type(&e.target_type, false),
3272 qcx.humanize_sql_scalar_type(&e.source_type, false),
3273 )
3274 })?;
3275
3276 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3279 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3280 if updated {
3281 let new_types = zip_types().map(|(ct, q)| {
3282 let mut ct = ct.clone();
3283 if q.nullable {
3284 ct.nullable = true;
3285 }
3286 ct
3287 });
3288 *desc = RelationDesc::from_names_and_types(
3289 desc.iter_names().cloned().zip_eq(new_types),
3290 );
3291 }
3292
3293 expr
3294 }
3295 };
3296 match stmt {
3297 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3298 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3299 }
3300 }
3301 let expr = exprs
3304 .into_iter()
3305 .reduce(|acc, expr| acc.union(expr))
3306 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3307 let dependencies = expr
3308 .depends_on()
3309 .into_iter()
3310 .map(|gid| scx.catalog.resolve_item_id(&gid))
3311 .collect();
3312
3313 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3314 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3315 if let Some(dup) = column_names.iter().duplicates().next() {
3316 sql_bail!("column {} specified more than once", dup.quoted());
3317 }
3318
3319 let name = match &ct_name {
3321 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3322 ResolvedItemName::ContinualTask { name, .. } => {
3323 let name = scx.allocate_qualified_name(name.clone())?;
3324 let full_name = scx.catalog.resolve_full_name(&name);
3325 let partial_name = PartialItemName::from(full_name.clone());
3326 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3329 return Err(PlanError::ItemAlreadyExists {
3330 name: full_name.to_string(),
3331 item_type: item.item_type(),
3332 });
3333 }
3334 name
3335 }
3336 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3337 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3338 };
3339
3340 let as_of = stmt.as_of.map(Timestamp::from);
3341 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3342 name,
3343 placeholder_id,
3344 desc,
3345 input_id: input.global_id(),
3346 with_snapshot: snapshot.unwrap_or(true),
3347 continual_task: MaterializedView {
3348 create_sql,
3349 expr,
3350 dependencies,
3351 column_names,
3352 replacement_target: None,
3353 cluster_id,
3354 target_replica: None,
3355 non_null_assertions: Vec::new(),
3356 compaction_window: None,
3357 refresh_schedule: None,
3358 as_of,
3359 },
3360 }))
3361}
3362
3363fn continual_task_query<'a>(
3364 ct_name: &ResolvedItemName,
3365 stmt: &'a ast::ContinualTaskStmt<Aug>,
3366) -> Option<ast::Query<Aug>> {
3367 match stmt {
3368 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3369 table_name: _,
3370 columns,
3371 source,
3372 returning,
3373 }) => {
3374 if !columns.is_empty() || !returning.is_empty() {
3375 return None;
3376 }
3377 match source {
3378 ast::InsertSource::Query(query) => Some(query.clone()),
3379 ast::InsertSource::DefaultValues => None,
3380 }
3381 }
3382 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3383 table_name: _,
3384 alias,
3385 using,
3386 selection,
3387 }) => {
3388 if !using.is_empty() {
3389 return None;
3390 }
3391 let from = ast::TableWithJoins {
3393 relation: ast::TableFactor::Table {
3394 name: ct_name.clone(),
3395 alias: alias.clone(),
3396 },
3397 joins: Vec::new(),
3398 };
3399 let select = ast::Select {
3400 from: vec![from],
3401 selection: selection.clone(),
3402 distinct: None,
3403 projection: vec![ast::SelectItem::Wildcard],
3404 group_by: Vec::new(),
3405 having: None,
3406 qualify: None,
3407 options: Vec::new(),
3408 };
3409 let query = ast::Query {
3410 ctes: ast::CteBlock::Simple(Vec::new()),
3411 body: ast::SetExpr::Select(Box::new(select)),
3412 order_by: Vec::new(),
3413 limit: None,
3414 offset: None,
3415 };
3416 Some(query)
3418 }
3419 }
3420}
3421
3422generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3423
3424pub fn describe_create_sink(
3425 _: &StatementContext,
3426 _: CreateSinkStatement<Aug>,
3427) -> Result<StatementDesc, PlanError> {
3428 Ok(StatementDesc::new(None))
3429}
3430
3431generate_extracted_config!(
3432 CreateSinkOption,
3433 (Snapshot, bool),
3434 (PartitionStrategy, String),
3435 (Version, u64),
3436 (CommitInterval, Duration)
3437);
3438
3439pub fn plan_create_sink(
3440 scx: &StatementContext,
3441 stmt: CreateSinkStatement<Aug>,
3442) -> Result<Plan, PlanError> {
3443 let Some(name) = stmt.name.clone() else {
3445 return Err(PlanError::MissingName(CatalogItemType::Sink));
3446 };
3447 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3448 let full_name = scx.catalog.resolve_full_name(&name);
3449 let partial_name = PartialItemName::from(full_name.clone());
3450 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3451 return Err(PlanError::ItemAlreadyExists {
3452 name: full_name.to_string(),
3453 item_type: item.item_type(),
3454 });
3455 }
3456
3457 plan_sink(scx, stmt)
3458}
3459
3460fn plan_sink(
3465 scx: &StatementContext,
3466 mut stmt: CreateSinkStatement<Aug>,
3467) -> Result<Plan, PlanError> {
3468 let CreateSinkStatement {
3469 name,
3470 in_cluster: _,
3471 from,
3472 connection,
3473 format,
3474 envelope,
3475 mode,
3476 if_not_exists,
3477 with_options,
3478 } = stmt.clone();
3479
3480 let Some(name) = name else {
3481 return Err(PlanError::MissingName(CatalogItemType::Sink));
3482 };
3483 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3484
3485 let envelope = match (&connection, envelope, mode) {
3486 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3488 SinkEnvelope::Upsert
3489 }
3490 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3491 SinkEnvelope::Debezium
3492 }
3493 (CreateSinkConnection::Kafka { .. }, None, None) => {
3494 sql_bail!("ENVELOPE clause is required")
3495 }
3496 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3497 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3498 }
3499 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3501 SinkEnvelope::Upsert
3502 }
3503 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3504 SinkEnvelope::Append
3505 }
3506 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3507 sql_bail!("MODE clause is required")
3508 }
3509 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3510 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3511 }
3512 };
3513
3514 let from_name = &from;
3515 let from = scx.get_item_by_resolved_name(&from)?;
3516
3517 {
3518 use CatalogItemType::*;
3519 match from.item_type() {
3520 Table | Source | MaterializedView | ContinualTask => {
3521 if from.replacement_target().is_some() {
3522 let name = scx.catalog.minimal_qualification(from.name());
3523 return Err(PlanError::InvalidSinkFrom {
3524 name: name.to_string(),
3525 item_type: format!("replacement {}", from.item_type()),
3526 });
3527 }
3528 }
3529 Sink | View | Index | Type | Func | Secret | Connection => {
3530 let name = scx.catalog.minimal_qualification(from.name());
3531 return Err(PlanError::InvalidSinkFrom {
3532 name: name.to_string(),
3533 item_type: from.item_type().to_string(),
3534 });
3535 }
3536 }
3537 }
3538
3539 if from.id().is_system() {
3540 bail_unsupported!("creating a sink directly on a catalog object");
3541 }
3542
3543 let desc = from.relation_desc().expect("item type checked above");
3544 let key_indices = match &connection {
3545 CreateSinkConnection::Kafka { key: Some(key), .. }
3546 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3547 let key_columns = key
3548 .key_columns
3549 .clone()
3550 .into_iter()
3551 .map(normalize::column_name)
3552 .collect::<Vec<_>>();
3553 let mut uniq = BTreeSet::new();
3554 for col in key_columns.iter() {
3555 if !uniq.insert(col) {
3556 sql_bail!("duplicate column referenced in KEY: {}", col);
3557 }
3558 }
3559 let indices = key_columns
3560 .iter()
3561 .map(|col| -> anyhow::Result<usize> {
3562 let name_idx =
3563 desc.get_by_name(col)
3564 .map(|(idx, _type)| idx)
3565 .ok_or_else(|| {
3566 sql_err!("column referenced in KEY does not exist: {}", col)
3567 })?;
3568 if desc.get_unambiguous_name(name_idx).is_none() {
3569 sql_err!("column referenced in KEY is ambiguous: {}", col);
3570 }
3571 Ok(name_idx)
3572 })
3573 .collect::<Result<Vec<_>, _>>()?;
3574
3575 if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3578 let cols: Vec<_> = desc.iter().collect();
3579 for &idx in &indices {
3580 let (col_name, col_type) = cols[idx];
3581 let scalar = &col_type.scalar_type;
3582 let is_valid = matches!(
3583 scalar,
3584 SqlScalarType::Bool
3586 | SqlScalarType::Int16
3587 | SqlScalarType::Int32
3588 | SqlScalarType::Int64
3589 | SqlScalarType::UInt16
3590 | SqlScalarType::UInt32
3591 | SqlScalarType::UInt64
3592 | SqlScalarType::Numeric { .. }
3594 | SqlScalarType::Date
3596 | SqlScalarType::Time
3597 | SqlScalarType::Timestamp { .. }
3598 | SqlScalarType::TimestampTz { .. }
3599 | SqlScalarType::Interval
3600 | SqlScalarType::MzTimestamp
3601 | SqlScalarType::String
3603 | SqlScalarType::Char { .. }
3604 | SqlScalarType::VarChar { .. }
3605 | SqlScalarType::PgLegacyChar
3606 | SqlScalarType::PgLegacyName
3607 | SqlScalarType::Bytes
3608 | SqlScalarType::Jsonb
3609 | SqlScalarType::Uuid
3611 | SqlScalarType::Oid
3612 | SqlScalarType::RegProc
3613 | SqlScalarType::RegType
3614 | SqlScalarType::RegClass
3615 | SqlScalarType::MzAclItem
3616 | SqlScalarType::AclItem
3617 | SqlScalarType::Int2Vector
3618 | SqlScalarType::Range { .. }
3620 );
3621 if !is_valid {
3622 return Err(PlanError::IcebergSinkUnsupportedKeyType {
3623 column: col_name.to_string(),
3624 column_type: format!("{:?}", scalar),
3625 });
3626 }
3627 }
3628 }
3629
3630 let is_valid_key = desc
3631 .typ()
3632 .keys
3633 .iter()
3634 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3635
3636 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3637 if key.not_enforced {
3638 scx.catalog
3639 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3640 key: key_columns.clone(),
3641 name: name.item.clone(),
3642 })
3643 } else {
3644 return Err(PlanError::UpsertSinkWithInvalidKey {
3645 name: from_name.full_name_str(),
3646 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3647 valid_keys: desc
3648 .typ()
3649 .keys
3650 .iter()
3651 .map(|key| {
3652 key.iter()
3653 .map(|col| desc.get_name(*col).as_str().into())
3654 .collect()
3655 })
3656 .collect(),
3657 });
3658 }
3659 }
3660 Some(indices)
3661 }
3662 CreateSinkConnection::Kafka { key: None, .. }
3663 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3664 };
3665
3666 if key_indices.is_some() && envelope == SinkEnvelope::Append {
3667 sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3668 }
3669
3670 if envelope == SinkEnvelope::Append {
3672 if let CreateSinkConnection::Iceberg { .. } = &connection {
3673 use mz_storage_types::sinks::{
3674 ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3675 };
3676 for (col_name, _) in desc.iter() {
3677 if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3678 || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3679 {
3680 sql_bail!(
3681 "column {} conflicts with the system column that MODE APPEND \
3682 adds to the Iceberg table",
3683 col_name.quoted()
3684 );
3685 }
3686 }
3687 }
3688 }
3689
3690 let headers_index = match &connection {
3691 CreateSinkConnection::Kafka {
3692 headers: Some(headers),
3693 ..
3694 } => {
3695 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3696
3697 match envelope {
3698 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3699 SinkEnvelope::Debezium => {
3700 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3701 }
3702 };
3703
3704 let headers = normalize::column_name(headers.clone());
3705 let (idx, ty) = desc
3706 .get_by_name(&headers)
3707 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3708
3709 if desc.get_unambiguous_name(idx).is_none() {
3710 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3711 }
3712
3713 match &ty.scalar_type {
3714 SqlScalarType::Map { value_type, .. }
3715 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3716 _ => sql_bail!(
3717 "HEADERS column must have type map[text => text] or map[text => bytea]"
3718 ),
3719 }
3720
3721 Some(idx)
3722 }
3723 _ => None,
3724 };
3725
3726 let relation_key_indices = desc.typ().keys.get(0).cloned();
3728
3729 let key_desc_and_indices = key_indices.map(|key_indices| {
3730 let cols = desc
3731 .iter()
3732 .map(|(name, ty)| (name.clone(), ty.clone()))
3733 .collect::<Vec<_>>();
3734 let (names, types): (Vec<_>, Vec<_>) =
3735 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3736 let typ = SqlRelationType::new(types);
3737 (RelationDesc::new(typ, names), key_indices)
3738 });
3739
3740 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3741 return Err(PlanError::UpsertSinkWithoutKey);
3742 }
3743
3744 let CreateSinkOptionExtracted {
3745 snapshot,
3746 version,
3747 partition_strategy: _,
3748 seen: _,
3749 commit_interval,
3750 } = with_options.try_into()?;
3751
3752 let connection_builder = match connection {
3753 CreateSinkConnection::Kafka {
3754 connection,
3755 options,
3756 ..
3757 } => kafka_sink_builder(
3758 scx,
3759 connection,
3760 options,
3761 format,
3762 relation_key_indices,
3763 key_desc_and_indices,
3764 headers_index,
3765 desc.into_owned(),
3766 envelope,
3767 from.id(),
3768 commit_interval,
3769 )?,
3770 CreateSinkConnection::Iceberg {
3771 connection,
3772 aws_connection,
3773 options,
3774 ..
3775 } => iceberg_sink_builder(
3776 scx,
3777 connection,
3778 aws_connection,
3779 options,
3780 relation_key_indices,
3781 key_desc_and_indices,
3782 commit_interval,
3783 )?,
3784 };
3785
3786 let with_snapshot = snapshot.unwrap_or(true);
3788 let version = version.unwrap_or(0);
3790
3791 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3795 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3796
3797 Ok(Plan::CreateSink(CreateSinkPlan {
3798 name,
3799 sink: Sink {
3800 create_sql,
3801 from: from.global_id(),
3802 connection: connection_builder,
3803 envelope,
3804 version,
3805 commit_interval,
3806 },
3807 with_snapshot,
3808 if_not_exists,
3809 in_cluster: in_cluster.id(),
3810 }))
3811}
3812
3813fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3814 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3815
3816 let existing_keys = desc
3817 .typ()
3818 .keys
3819 .iter()
3820 .map(|key_columns| {
3821 key_columns
3822 .iter()
3823 .map(|col| desc.get_name(*col).as_str())
3824 .join(", ")
3825 })
3826 .join(", ");
3827
3828 sql_err!(
3829 "Key constraint ({}) conflicts with existing key ({})",
3830 user_keys,
3831 existing_keys
3832 )
3833}
3834
3835#[derive(Debug, Default, PartialEq, Clone)]
3838pub struct CsrConfigOptionExtracted {
3839 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3840 pub(crate) avro_key_fullname: Option<String>,
3841 pub(crate) avro_value_fullname: Option<String>,
3842 pub(crate) null_defaults: bool,
3843 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3844 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3845 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3846 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3847}
3848
3849impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3850 type Error = crate::plan::PlanError;
3851 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3852 let mut extracted = CsrConfigOptionExtracted::default();
3853 let mut common_doc_comments = BTreeMap::new();
3854 for option in v {
3855 if !extracted.seen.insert(option.name.clone()) {
3856 return Err(PlanError::Unstructured({
3857 format!("{} specified more than once", option.name)
3858 }));
3859 }
3860 let option_name = option.name.clone();
3861 let option_name_str = option_name.to_ast_string_simple();
3862 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3863 option_name: option_name.to_ast_string_simple(),
3864 err: e.into(),
3865 };
3866 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3867 val.map(|s| match s {
3868 WithOptionValue::Value(Value::String(s)) => {
3869 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3870 }
3871 _ => Err("must be a string".to_string()),
3872 })
3873 .transpose()
3874 .map_err(PlanError::Unstructured)
3875 .map_err(better_error)
3876 };
3877 match option.name {
3878 CsrConfigOptionName::AvroKeyFullname => {
3879 extracted.avro_key_fullname =
3880 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3881 }
3882 CsrConfigOptionName::AvroValueFullname => {
3883 extracted.avro_value_fullname =
3884 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3885 }
3886 CsrConfigOptionName::NullDefaults => {
3887 extracted.null_defaults =
3888 <bool>::try_from_value(option.value).map_err(better_error)?;
3889 }
3890 CsrConfigOptionName::AvroDocOn(doc_on) => {
3891 let value = String::try_from_value(option.value.ok_or_else(|| {
3892 PlanError::InvalidOptionValue {
3893 option_name: option_name_str,
3894 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3895 }
3896 })?)
3897 .map_err(better_error)?;
3898 let key = match doc_on.identifier {
3899 DocOnIdentifier::Column(ast::ColumnName {
3900 relation: ResolvedItemName::Item { id, .. },
3901 column: ResolvedColumnReference::Column { name, index: _ },
3902 }) => DocTarget::Field {
3903 object_id: id,
3904 column_name: name,
3905 },
3906 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3907 DocTarget::Type(id)
3908 }
3909 _ => unreachable!(),
3910 };
3911
3912 match doc_on.for_schema {
3913 DocOnSchema::KeyOnly => {
3914 extracted.key_doc_options.insert(key, value);
3915 }
3916 DocOnSchema::ValueOnly => {
3917 extracted.value_doc_options.insert(key, value);
3918 }
3919 DocOnSchema::All => {
3920 common_doc_comments.insert(key, value);
3921 }
3922 }
3923 }
3924 CsrConfigOptionName::KeyCompatibilityLevel => {
3925 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3926 }
3927 CsrConfigOptionName::ValueCompatibilityLevel => {
3928 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3929 }
3930 }
3931 }
3932
3933 for (key, value) in common_doc_comments {
3934 if !extracted.key_doc_options.contains_key(&key) {
3935 extracted.key_doc_options.insert(key.clone(), value.clone());
3936 }
3937 if !extracted.value_doc_options.contains_key(&key) {
3938 extracted.value_doc_options.insert(key, value);
3939 }
3940 }
3941 Ok(extracted)
3942 }
3943}
3944
3945fn iceberg_sink_builder(
3946 scx: &StatementContext,
3947 catalog_connection: ResolvedItemName,
3948 aws_connection: ResolvedItemName,
3949 options: Vec<IcebergSinkConfigOption<Aug>>,
3950 relation_key_indices: Option<Vec<usize>>,
3951 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3952 commit_interval: Option<Duration>,
3953) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3954 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3955 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3956 let catalog_connection_id = catalog_connection_item.id();
3957 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3958 let aws_connection_id = aws_connection_item.id();
3959 if !matches!(
3960 catalog_connection_item.connection()?,
3961 Connection::IcebergCatalog(_)
3962 ) {
3963 sql_bail!(
3964 "{} is not an iceberg catalog connection",
3965 scx.catalog
3966 .resolve_full_name(catalog_connection_item.name())
3967 .to_string()
3968 .quoted()
3969 );
3970 };
3971
3972 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3973 sql_bail!(
3974 "{} is not an AWS connection",
3975 scx.catalog
3976 .resolve_full_name(aws_connection_item.name())
3977 .to_string()
3978 .quoted()
3979 );
3980 }
3981
3982 let IcebergSinkConfigOptionExtracted {
3983 table,
3984 namespace,
3985 seen: _,
3986 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3987
3988 let Some(table) = table else {
3989 sql_bail!("Iceberg sink must specify TABLE");
3990 };
3991 let Some(namespace) = namespace else {
3992 sql_bail!("Iceberg sink must specify NAMESPACE");
3993 };
3994 if commit_interval.is_none() {
3995 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3996 }
3997
3998 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3999 catalog_connection_id,
4000 catalog_connection: catalog_connection_id,
4001 aws_connection_id,
4002 aws_connection: aws_connection_id,
4003 table,
4004 namespace,
4005 relation_key_indices,
4006 key_desc_and_indices,
4007 }))
4008}
4009
4010fn kafka_sink_builder(
4011 scx: &StatementContext,
4012 connection: ResolvedItemName,
4013 options: Vec<KafkaSinkConfigOption<Aug>>,
4014 format: Option<FormatSpecifier<Aug>>,
4015 relation_key_indices: Option<Vec<usize>>,
4016 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
4017 headers_index: Option<usize>,
4018 value_desc: RelationDesc,
4019 envelope: SinkEnvelope,
4020 sink_from: CatalogItemId,
4021 commit_interval: Option<Duration>,
4022) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
4023 let connection_item = scx.get_item_by_resolved_name(&connection)?;
4025 let connection_id = connection_item.id();
4026 match connection_item.connection()? {
4027 Connection::Kafka(_) => (),
4028 _ => sql_bail!(
4029 "{} is not a kafka connection",
4030 scx.catalog.resolve_full_name(connection_item.name())
4031 ),
4032 };
4033
4034 if commit_interval.is_some() {
4035 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
4036 }
4037
4038 let KafkaSinkConfigOptionExtracted {
4039 topic,
4040 compression_type,
4041 partition_by,
4042 progress_group_id_prefix,
4043 transactional_id_prefix,
4044 legacy_ids,
4045 topic_config,
4046 topic_metadata_refresh_interval,
4047 topic_partition_count,
4048 topic_replication_factor,
4049 seen: _,
4050 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
4051
4052 let transactional_id = match (transactional_id_prefix, legacy_ids) {
4053 (Some(_), Some(true)) => {
4054 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
4055 }
4056 (None, Some(true)) => KafkaIdStyle::Legacy,
4057 (prefix, _) => KafkaIdStyle::Prefix(prefix),
4058 };
4059
4060 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
4061 (Some(_), Some(true)) => {
4062 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
4063 }
4064 (None, Some(true)) => KafkaIdStyle::Legacy,
4065 (prefix, _) => KafkaIdStyle::Prefix(prefix),
4066 };
4067
4068 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
4069
4070 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
4071 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
4074 }
4075
4076 let assert_positive = |val: Option<i32>, name: &str| {
4077 if let Some(val) = val {
4078 if val <= 0 {
4079 sql_bail!("{} must be a positive integer", name);
4080 }
4081 }
4082 val.map(NonNeg::try_from)
4083 .transpose()
4084 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
4085 };
4086 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
4087 let topic_replication_factor =
4088 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
4089
4090 let gen_avro_schema_options = |conn| {
4093 let CsrConnectionAvro {
4094 connection:
4095 CsrConnection {
4096 connection,
4097 options,
4098 },
4099 seed,
4100 key_strategy,
4101 value_strategy,
4102 } = conn;
4103 if seed.is_some() {
4104 sql_bail!("SEED option does not make sense with sinks");
4105 }
4106 if key_strategy.is_some() {
4107 sql_bail!("KEY STRATEGY option does not make sense with sinks");
4108 }
4109 if value_strategy.is_some() {
4110 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
4111 }
4112
4113 let item = scx.get_item_by_resolved_name(&connection)?;
4114 let csr_connection = match item.connection()? {
4115 Connection::Csr(_) => item.id(),
4116 _ => {
4117 sql_bail!(
4118 "{} is not a schema registry connection",
4119 scx.catalog
4120 .resolve_full_name(item.name())
4121 .to_string()
4122 .quoted()
4123 )
4124 }
4125 };
4126 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4127
4128 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4129 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4130 }
4131
4132 if key_desc_and_indices.is_some()
4133 && (extracted_options.avro_key_fullname.is_some()
4134 ^ extracted_options.avro_value_fullname.is_some())
4135 {
4136 sql_bail!(
4137 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4138 );
4139 }
4140
4141 Ok((csr_connection, extracted_options))
4142 };
4143
4144 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4145 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4146 Format::Bytes if desc.arity() == 1 => {
4147 let col_type = &desc.typ().column_types[0].scalar_type;
4148 if !mz_pgrepr::Value::can_encode_binary(col_type) {
4149 bail_unsupported!(format!(
4150 "BYTES format with non-encodable type: {:?}",
4151 col_type
4152 ));
4153 }
4154
4155 Ok(KafkaSinkFormatType::Bytes)
4156 }
4157 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4158 Format::Bytes | Format::Text => {
4159 bail_unsupported!("BYTES or TEXT format with multiple columns")
4160 }
4161 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4162 Format::Avro(AvroSchema::Csr { csr_connection }) => {
4163 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4164 let schema = if is_key {
4165 AvroSchemaGenerator::new(
4166 desc.clone(),
4167 false,
4168 options.key_doc_options,
4169 options.avro_key_fullname.as_deref().unwrap_or("row"),
4170 options.null_defaults,
4171 Some(sink_from),
4172 false,
4173 )?
4174 .schema()
4175 .to_string()
4176 } else {
4177 AvroSchemaGenerator::new(
4178 desc.clone(),
4179 matches!(envelope, SinkEnvelope::Debezium),
4180 options.value_doc_options,
4181 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4182 options.null_defaults,
4183 Some(sink_from),
4184 true,
4185 )?
4186 .schema()
4187 .to_string()
4188 };
4189 Ok(KafkaSinkFormatType::Avro {
4190 schema,
4191 compatibility_level: if is_key {
4192 options.key_compatibility_level
4193 } else {
4194 options.value_compatibility_level
4195 },
4196 csr_connection,
4197 })
4198 }
4199 format => bail_unsupported!(format!("sink format {:?}", format)),
4200 };
4201
4202 let partition_by = match &partition_by {
4203 Some(partition_by) => {
4204 let mut scope = Scope::from_source(None, value_desc.iter_names());
4205
4206 match envelope {
4207 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
4208 SinkEnvelope::Debezium => {
4209 let key_indices: HashSet<_> = key_desc_and_indices
4210 .as_ref()
4211 .map(|(_desc, indices)| indices.as_slice())
4212 .unwrap_or_default()
4213 .into_iter()
4214 .collect();
4215 for (i, item) in scope.items.iter_mut().enumerate() {
4216 if !key_indices.contains(&i) {
4217 item.error_if_referenced = Some(|_table, column| {
4218 PlanError::InvalidPartitionByEnvelopeDebezium {
4219 column_name: column.to_string(),
4220 }
4221 });
4222 }
4223 }
4224 }
4225 };
4226
4227 let ecx = &ExprContext {
4228 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4229 name: "PARTITION BY",
4230 scope: &scope,
4231 relation_type: value_desc.typ(),
4232 allow_aggregates: false,
4233 allow_subqueries: false,
4234 allow_parameters: false,
4235 allow_windows: false,
4236 };
4237 let expr = plan_expr(ecx, partition_by)?.cast_to(
4238 ecx,
4239 CastContext::Assignment,
4240 &SqlScalarType::UInt64,
4241 )?;
4242 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4243
4244 Some(expr)
4245 }
4246 _ => None,
4247 };
4248
4249 let format = match format {
4251 Some(FormatSpecifier::KeyValue { key, value }) => {
4252 let key_format = match key_desc_and_indices.as_ref() {
4253 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4254 None => None,
4255 };
4256 KafkaSinkFormat {
4257 value_format: map_format(value, &value_desc, false)?,
4258 key_format,
4259 }
4260 }
4261 Some(FormatSpecifier::Bare(format)) => {
4262 let key_format = match key_desc_and_indices.as_ref() {
4263 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4264 None => None,
4265 };
4266 KafkaSinkFormat {
4267 value_format: map_format(format, &value_desc, false)?,
4268 key_format,
4269 }
4270 }
4271 None => bail_unsupported!("sink without format"),
4272 };
4273
4274 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4275 connection_id,
4276 connection: connection_id,
4277 format,
4278 topic: topic_name,
4279 relation_key_indices,
4280 key_desc_and_indices,
4281 headers_index,
4282 value_desc,
4283 partition_by,
4284 compression_type,
4285 progress_group_id,
4286 transactional_id,
4287 topic_options: KafkaTopicOptions {
4288 partition_count: topic_partition_count,
4289 replication_factor: topic_replication_factor,
4290 topic_config: topic_config.unwrap_or_default(),
4291 },
4292 topic_metadata_refresh_interval,
4293 }))
4294}
4295
4296pub fn describe_create_index(
4297 _: &StatementContext,
4298 _: CreateIndexStatement<Aug>,
4299) -> Result<StatementDesc, PlanError> {
4300 Ok(StatementDesc::new(None))
4301}
4302
4303pub fn plan_create_index(
4304 scx: &StatementContext,
4305 mut stmt: CreateIndexStatement<Aug>,
4306) -> Result<Plan, PlanError> {
4307 let CreateIndexStatement {
4308 name,
4309 on_name,
4310 in_cluster,
4311 key_parts,
4312 with_options,
4313 if_not_exists,
4314 } = &mut stmt;
4315 let on = scx.get_item_by_resolved_name(on_name)?;
4316
4317 {
4318 use CatalogItemType::*;
4319 match on.item_type() {
4320 Table | Source | View | MaterializedView | ContinualTask => {
4321 if on.replacement_target().is_some() {
4322 sql_bail!(
4323 "index cannot be created on {} because it is a replacement {}",
4324 on_name.full_name_str(),
4325 on.item_type(),
4326 );
4327 }
4328 }
4329 Sink | Index | Type | Func | Secret | Connection => {
4330 sql_bail!(
4331 "index cannot be created on {} because it is a {}",
4332 on_name.full_name_str(),
4333 on.item_type(),
4334 );
4335 }
4336 }
4337 }
4338
4339 let on_desc = on.relation_desc().expect("item type checked above");
4340
4341 let filled_key_parts = match key_parts {
4342 Some(kp) => kp.to_vec(),
4343 None => {
4344 let mut name_counts = BTreeMap::new();
4349 for name in on_desc.iter_names() {
4350 *name_counts.entry(name).or_insert(0usize) += 1;
4351 }
4352 let key = on_desc.typ().default_key();
4353 key.iter()
4354 .map(|i| {
4355 let name = on_desc.get_name(*i);
4356 if name_counts.get(name).copied() == Some(1) {
4357 Expr::Identifier(vec![name.clone().into()])
4358 } else {
4359 Expr::Value(Value::Number((i + 1).to_string()))
4360 }
4361 })
4362 .collect()
4363 }
4364 };
4365 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4366
4367 let index_name = if let Some(name) = name {
4368 QualifiedItemName {
4369 qualifiers: on.name().qualifiers.clone(),
4370 item: normalize::ident(name.clone()),
4371 }
4372 } else {
4373 let mut idx_name = QualifiedItemName {
4374 qualifiers: on.name().qualifiers.clone(),
4375 item: on.name().item.clone(),
4376 };
4377 if key_parts.is_none() {
4378 idx_name.item += "_primary_idx";
4380 } else {
4381 let index_name_col_suffix = keys
4384 .iter()
4385 .map(|k| match k {
4386 mz_expr::MirScalarExpr::Column(i, name) => {
4387 match (on_desc.get_unambiguous_name(*i), &name.0) {
4388 (Some(col_name), _) => col_name.to_string(),
4389 (None, Some(name)) => name.to_string(),
4390 (None, None) => format!("{}", i + 1),
4391 }
4392 }
4393 _ => "expr".to_string(),
4394 })
4395 .join("_");
4396 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4397 .expect("write on strings cannot fail");
4398 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4399 }
4400
4401 if !*if_not_exists {
4402 scx.catalog.find_available_name(idx_name)
4403 } else {
4404 idx_name
4405 }
4406 };
4407
4408 let full_name = scx.catalog.resolve_full_name(&index_name);
4410 let partial_name = PartialItemName::from(full_name.clone());
4411 if let (Ok(item), false, false) = (
4419 scx.catalog.resolve_item_or_type(&partial_name),
4420 *if_not_exists,
4421 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4422 ) {
4423 return Err(PlanError::ItemAlreadyExists {
4424 name: full_name.to_string(),
4425 item_type: item.item_type(),
4426 });
4427 }
4428
4429 let options = plan_index_options(scx, with_options.clone())?;
4430 let cluster_id = match in_cluster {
4431 None => scx.resolve_cluster(None)?.id(),
4432 Some(in_cluster) => in_cluster.id,
4433 };
4434
4435 *in_cluster = Some(ResolvedClusterName {
4436 id: cluster_id,
4437 print_name: None,
4438 });
4439
4440 *name = Some(Ident::new(index_name.item.clone())?);
4442 *key_parts = Some(filled_key_parts);
4443 let if_not_exists = *if_not_exists;
4444
4445 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4446 let compaction_window = options.iter().find_map(|o| {
4447 #[allow(irrefutable_let_patterns)]
4448 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4449 Some(lcw.clone())
4450 } else {
4451 None
4452 }
4453 });
4454
4455 Ok(Plan::CreateIndex(CreateIndexPlan {
4456 name: index_name,
4457 index: Index {
4458 create_sql,
4459 on: on.global_id(),
4460 keys,
4461 cluster_id,
4462 compaction_window,
4463 },
4464 if_not_exists,
4465 }))
4466}
4467
4468pub fn describe_create_type(
4469 _: &StatementContext,
4470 _: CreateTypeStatement<Aug>,
4471) -> Result<StatementDesc, PlanError> {
4472 Ok(StatementDesc::new(None))
4473}
4474
4475pub fn plan_create_type(
4476 scx: &StatementContext,
4477 stmt: CreateTypeStatement<Aug>,
4478) -> Result<Plan, PlanError> {
4479 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4480 let CreateTypeStatement { name, as_type, .. } = stmt;
4481
4482 fn validate_data_type(
4483 scx: &StatementContext,
4484 data_type: ResolvedDataType,
4485 as_type: &str,
4486 key: &str,
4487 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4488 let (id, modifiers) = match data_type {
4489 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4490 _ => sql_bail!(
4491 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4492 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4493 as_type,
4494 key,
4495 data_type.human_readable_name(),
4496 ),
4497 };
4498
4499 let item = scx.catalog.get_item(&id);
4500 match item.type_details() {
4501 None => sql_bail!(
4502 "{} must be of class type, but received {} which is of class {}",
4503 key,
4504 scx.catalog.resolve_full_name(item.name()),
4505 item.item_type()
4506 ),
4507 Some(CatalogTypeDetails {
4508 typ: CatalogType::Char,
4509 ..
4510 }) => {
4511 bail_unsupported!("embedding char type in a list or map")
4512 }
4513 _ => {
4514 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4516
4517 Ok((id, modifiers))
4518 }
4519 }
4520 }
4521
4522 let inner = match as_type {
4523 CreateTypeAs::List { options } => {
4524 let CreateTypeListOptionExtracted {
4525 element_type,
4526 seen: _,
4527 } = CreateTypeListOptionExtracted::try_from(options)?;
4528 let element_type =
4529 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4530 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4531 CatalogType::List {
4532 element_reference: id,
4533 element_modifiers: modifiers,
4534 }
4535 }
4536 CreateTypeAs::Map { options } => {
4537 let CreateTypeMapOptionExtracted {
4538 key_type,
4539 value_type,
4540 seen: _,
4541 } = CreateTypeMapOptionExtracted::try_from(options)?;
4542 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4543 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4544 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4545 let (value_id, value_modifiers) =
4546 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4547 CatalogType::Map {
4548 key_reference: key_id,
4549 key_modifiers,
4550 value_reference: value_id,
4551 value_modifiers,
4552 }
4553 }
4554 CreateTypeAs::Record { column_defs } => {
4555 let mut fields = vec![];
4556 for column_def in column_defs {
4557 let data_type = column_def.data_type;
4558 let key = ident(column_def.name.clone());
4559 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4560 fields.push(CatalogRecordField {
4561 name: ColumnName::from(key.clone()),
4562 type_reference: id,
4563 type_modifiers: modifiers,
4564 });
4565 }
4566 CatalogType::Record { fields }
4567 }
4568 };
4569
4570 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4571
4572 let full_name = scx.catalog.resolve_full_name(&name);
4574 let partial_name = PartialItemName::from(full_name.clone());
4575 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4578 if item.item_type().conflicts_with_type() {
4579 return Err(PlanError::ItemAlreadyExists {
4580 name: full_name.to_string(),
4581 item_type: item.item_type(),
4582 });
4583 }
4584 }
4585
4586 Ok(Plan::CreateType(CreateTypePlan {
4587 name,
4588 typ: Type { create_sql, inner },
4589 }))
4590}
4591
4592generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4593
4594generate_extracted_config!(
4595 CreateTypeMapOption,
4596 (KeyType, ResolvedDataType),
4597 (ValueType, ResolvedDataType)
4598);
4599
4600#[derive(Debug)]
4601pub enum PlannedAlterRoleOption {
4602 Attributes(PlannedRoleAttributes),
4603 Variable(PlannedRoleVariable),
4604}
4605
4606#[derive(Debug, Clone)]
4607pub struct PlannedRoleAttributes {
4608 pub inherit: Option<bool>,
4609 pub password: Option<Password>,
4610 pub scram_iterations: Option<NonZeroU32>,
4611 pub nopassword: Option<bool>,
4615 pub superuser: Option<bool>,
4616 pub login: Option<bool>,
4617}
4618
4619fn plan_role_attributes(
4620 options: Vec<RoleAttribute>,
4621 scx: &StatementContext,
4622) -> Result<PlannedRoleAttributes, PlanError> {
4623 let mut planned_attributes = PlannedRoleAttributes {
4624 inherit: None,
4625 password: None,
4626 scram_iterations: None,
4627 superuser: None,
4628 login: None,
4629 nopassword: None,
4630 };
4631
4632 for option in options {
4633 match option {
4634 RoleAttribute::Inherit | RoleAttribute::NoInherit
4635 if planned_attributes.inherit.is_some() =>
4636 {
4637 sql_bail!("conflicting or redundant options");
4638 }
4639 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4640 bail_never_supported!(
4641 "CREATECLUSTER attribute",
4642 "sql/create-role/#details",
4643 "Use system privileges instead."
4644 );
4645 }
4646 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4647 bail_never_supported!(
4648 "CREATEDB attribute",
4649 "sql/create-role/#details",
4650 "Use system privileges instead."
4651 );
4652 }
4653 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4654 bail_never_supported!(
4655 "CREATEROLE attribute",
4656 "sql/create-role/#details",
4657 "Use system privileges instead."
4658 );
4659 }
4660 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4661 sql_bail!("conflicting or redundant options");
4662 }
4663
4664 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4665 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4666 RoleAttribute::Password(password) => {
4667 if let Some(password) = password {
4668 planned_attributes.password = Some(password.into());
4669 planned_attributes.scram_iterations =
4670 Some(scx.catalog.system_vars().scram_iterations())
4671 } else {
4672 planned_attributes.nopassword = Some(true);
4673 }
4674 }
4675 RoleAttribute::SuperUser => {
4676 if planned_attributes.superuser == Some(false) {
4677 sql_bail!("conflicting or redundant options");
4678 }
4679 planned_attributes.superuser = Some(true);
4680 }
4681 RoleAttribute::NoSuperUser => {
4682 if planned_attributes.superuser == Some(true) {
4683 sql_bail!("conflicting or redundant options");
4684 }
4685 planned_attributes.superuser = Some(false);
4686 }
4687 RoleAttribute::Login => {
4688 if planned_attributes.login == Some(false) {
4689 sql_bail!("conflicting or redundant options");
4690 }
4691 planned_attributes.login = Some(true);
4692 }
4693 RoleAttribute::NoLogin => {
4694 if planned_attributes.login == Some(true) {
4695 sql_bail!("conflicting or redundant options");
4696 }
4697 planned_attributes.login = Some(false);
4698 }
4699 }
4700 }
4701 if planned_attributes.inherit == Some(false) {
4702 bail_unsupported!("non inherit roles");
4703 }
4704
4705 Ok(planned_attributes)
4706}
4707
4708#[derive(Debug)]
4709pub enum PlannedRoleVariable {
4710 Set { name: String, value: VariableValue },
4711 Reset { name: String },
4712}
4713
4714impl PlannedRoleVariable {
4715 pub fn name(&self) -> &str {
4716 match self {
4717 PlannedRoleVariable::Set { name, .. } => name,
4718 PlannedRoleVariable::Reset { name } => name,
4719 }
4720 }
4721}
4722
4723fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4724 let plan = match variable {
4725 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4726 name: name.to_string(),
4727 value: scl::plan_set_variable_to(value)?,
4728 },
4729 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4730 name: name.to_string(),
4731 },
4732 };
4733 Ok(plan)
4734}
4735
4736pub fn describe_create_role(
4737 _: &StatementContext,
4738 _: CreateRoleStatement,
4739) -> Result<StatementDesc, PlanError> {
4740 Ok(StatementDesc::new(None))
4741}
4742
4743pub fn plan_create_role(
4744 scx: &StatementContext,
4745 CreateRoleStatement { name, options }: CreateRoleStatement,
4746) -> Result<Plan, PlanError> {
4747 let attributes = plan_role_attributes(options, scx)?;
4748 Ok(Plan::CreateRole(CreateRolePlan {
4749 name: normalize::ident(name),
4750 attributes: attributes.into(),
4751 }))
4752}
4753
4754pub fn plan_create_network_policy(
4755 ctx: &StatementContext,
4756 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4757) -> Result<Plan, PlanError> {
4758 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4759 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4760
4761 let Some(rule_defs) = policy_options.rules else {
4762 sql_bail!("RULES must be specified when creating network policies.");
4763 };
4764
4765 let mut rules = vec![];
4766 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4767 let NetworkPolicyRuleOptionExtracted {
4768 seen: _,
4769 direction,
4770 action,
4771 address,
4772 } = options.try_into()?;
4773 let (direction, action, address) = match (direction, action, address) {
4774 (Some(direction), Some(action), Some(address)) => (
4775 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4776 NetworkPolicyRuleAction::try_from(action.as_str())?,
4777 PolicyAddress::try_from(address.as_str())?,
4778 ),
4779 (_, _, _) => {
4780 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4781 }
4782 };
4783 rules.push(NetworkPolicyRule {
4784 name: normalize::ident(name),
4785 direction,
4786 action,
4787 address,
4788 });
4789 }
4790
4791 if rules.len()
4792 > ctx
4793 .catalog
4794 .system_vars()
4795 .max_rules_per_network_policy()
4796 .try_into()?
4797 {
4798 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4799 }
4800
4801 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4802 name: normalize::ident(name),
4803 rules,
4804 }))
4805}
4806
4807pub fn plan_alter_network_policy(
4808 ctx: &StatementContext,
4809 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4810) -> Result<Plan, PlanError> {
4811 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4812
4813 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4814 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4815
4816 let Some(rule_defs) = policy_options.rules else {
4817 sql_bail!("RULES must be specified when creating network policies.");
4818 };
4819
4820 let mut rules = vec![];
4821 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4822 let NetworkPolicyRuleOptionExtracted {
4823 seen: _,
4824 direction,
4825 action,
4826 address,
4827 } = options.try_into()?;
4828
4829 let (direction, action, address) = match (direction, action, address) {
4830 (Some(direction), Some(action), Some(address)) => (
4831 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4832 NetworkPolicyRuleAction::try_from(action.as_str())?,
4833 PolicyAddress::try_from(address.as_str())?,
4834 ),
4835 (_, _, _) => {
4836 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4837 }
4838 };
4839 rules.push(NetworkPolicyRule {
4840 name: normalize::ident(name),
4841 direction,
4842 action,
4843 address,
4844 });
4845 }
4846 if rules.len()
4847 > ctx
4848 .catalog
4849 .system_vars()
4850 .max_rules_per_network_policy()
4851 .try_into()?
4852 {
4853 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4854 }
4855
4856 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4857 id: policy.id(),
4858 name: normalize::ident(name),
4859 rules,
4860 }))
4861}
4862
4863pub fn describe_create_cluster(
4864 _: &StatementContext,
4865 _: CreateClusterStatement<Aug>,
4866) -> Result<StatementDesc, PlanError> {
4867 Ok(StatementDesc::new(None))
4868}
4869
4870generate_extracted_config!(
4876 ClusterOption,
4877 (AvailabilityZones, Vec<String>),
4878 (Disk, bool),
4879 (IntrospectionDebugging, bool),
4880 (IntrospectionInterval, OptionalDuration),
4881 (Managed, bool),
4882 (Replicas, Vec<ReplicaDefinition<Aug>>),
4883 (ReplicationFactor, u32),
4884 (Size, String),
4885 (Schedule, ClusterScheduleOptionValue),
4886 (WorkloadClass, OptionalString)
4887);
4888
4889generate_extracted_config!(
4890 NetworkPolicyOption,
4891 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4892);
4893
4894generate_extracted_config!(
4895 NetworkPolicyRuleOption,
4896 (Direction, String),
4897 (Action, String),
4898 (Address, String)
4899);
4900
4901generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4902
4903generate_extracted_config!(
4904 ClusterAlterUntilReadyOption,
4905 (Timeout, Duration),
4906 (OnTimeout, String)
4907);
4908
4909generate_extracted_config!(
4910 ClusterFeature,
4911 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4912 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4913 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4914 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4915 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4916 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4917 (
4918 EnableProjectionPushdownAfterRelationCse,
4919 Option<bool>,
4920 Default(None)
4921 )
4922);
4923
4924pub fn plan_create_cluster(
4928 scx: &StatementContext,
4929 stmt: CreateClusterStatement<Aug>,
4930) -> Result<Plan, PlanError> {
4931 let plan = plan_create_cluster_inner(scx, stmt)?;
4932
4933 if let CreateClusterVariant::Managed(_) = &plan.variant {
4935 let stmt = unplan_create_cluster(scx, plan.clone())
4936 .map_err(|e| PlanError::Replan(e.to_string()))?;
4937 let create_sql = stmt.to_ast_string_stable();
4938 let stmt = parse::parse(&create_sql)
4939 .map_err(|e| PlanError::Replan(e.to_string()))?
4940 .into_element()
4941 .ast;
4942 let (stmt, _resolved_ids) =
4943 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4944 let stmt = match stmt {
4945 Statement::CreateCluster(stmt) => stmt,
4946 stmt => {
4947 return Err(PlanError::Replan(format!(
4948 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4949 )));
4950 }
4951 };
4952 let replan =
4953 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4954 if plan != replan {
4955 return Err(PlanError::Replan(format!(
4956 "replan does not match: plan={plan:?}, replan={replan:?}"
4957 )));
4958 }
4959 }
4960
4961 Ok(Plan::CreateCluster(plan))
4962}
4963
4964pub fn plan_create_cluster_inner(
4965 scx: &StatementContext,
4966 CreateClusterStatement {
4967 name,
4968 options,
4969 features,
4970 }: CreateClusterStatement<Aug>,
4971) -> Result<CreateClusterPlan, PlanError> {
4972 let ClusterOptionExtracted {
4973 availability_zones,
4974 introspection_debugging,
4975 introspection_interval,
4976 managed,
4977 replicas,
4978 replication_factor,
4979 seen: _,
4980 size,
4981 disk,
4982 schedule,
4983 workload_class,
4984 }: ClusterOptionExtracted = options.try_into()?;
4985
4986 let managed = managed.unwrap_or_else(|| replicas.is_none());
4987
4988 if !scx.catalog.active_role_id().is_system() {
4989 if !features.is_empty() {
4990 sql_bail!("FEATURES not supported for non-system users");
4991 }
4992 if workload_class.is_some() {
4993 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4994 }
4995 }
4996
4997 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4998 let workload_class = workload_class.and_then(|v| v.0);
4999
5000 if managed {
5001 if replicas.is_some() {
5002 sql_bail!("REPLICAS not supported for managed clusters");
5003 }
5004 let Some(size) = size else {
5005 sql_bail!("SIZE must be specified for managed clusters");
5006 };
5007
5008 if disk.is_some() {
5009 if scx.catalog.is_cluster_size_cc(&size) {
5013 sql_bail!(
5014 "DISK option not supported for modern cluster sizes because disk is always enabled"
5015 );
5016 }
5017
5018 scx.catalog
5019 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5020 }
5021
5022 let compute = plan_compute_replica_config(
5023 introspection_interval,
5024 introspection_debugging.unwrap_or(false),
5025 )?;
5026
5027 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
5028 replication_factor.unwrap_or_else(|| {
5029 scx.catalog
5030 .system_vars()
5031 .default_cluster_replication_factor()
5032 })
5033 } else {
5034 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
5035 if replication_factor.is_some() {
5036 sql_bail!(
5037 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
5038 );
5039 }
5040 0
5044 };
5045 let availability_zones = availability_zones.unwrap_or_default();
5046
5047 if !availability_zones.is_empty() {
5048 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
5049 }
5050
5051 let ClusterFeatureExtracted {
5053 reoptimize_imported_views,
5054 enable_eager_delta_joins,
5055 enable_new_outer_join_lowering,
5056 enable_variadic_left_join_lowering,
5057 enable_letrec_fixpoint_analysis,
5058 enable_join_prioritize_arranged,
5059 enable_projection_pushdown_after_relation_cse,
5060 seen: _,
5061 } = ClusterFeatureExtracted::try_from(features)?;
5062 let optimizer_feature_overrides = OptimizerFeatureOverrides {
5063 reoptimize_imported_views,
5064 enable_eager_delta_joins,
5065 enable_new_outer_join_lowering,
5066 enable_variadic_left_join_lowering,
5067 enable_letrec_fixpoint_analysis,
5068 enable_join_prioritize_arranged,
5069 enable_projection_pushdown_after_relation_cse,
5070 ..Default::default()
5071 };
5072
5073 let schedule = plan_cluster_schedule(schedule)?;
5074
5075 Ok(CreateClusterPlan {
5076 name: normalize::ident(name),
5077 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
5078 replication_factor,
5079 size,
5080 availability_zones,
5081 compute,
5082 optimizer_feature_overrides,
5083 schedule,
5084 }),
5085 workload_class,
5086 })
5087 } else {
5088 let Some(replica_defs) = replicas else {
5089 sql_bail!("REPLICAS must be specified for unmanaged clusters");
5090 };
5091 if availability_zones.is_some() {
5092 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
5093 }
5094 if replication_factor.is_some() {
5095 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
5096 }
5097 if introspection_debugging.is_some() {
5098 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
5099 }
5100 if introspection_interval.is_some() {
5101 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
5102 }
5103 if size.is_some() {
5104 sql_bail!("SIZE not supported for unmanaged clusters");
5105 }
5106 if disk.is_some() {
5107 sql_bail!("DISK not supported for unmanaged clusters");
5108 }
5109 if !features.is_empty() {
5110 sql_bail!("FEATURES not supported for unmanaged clusters");
5111 }
5112 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
5113 sql_bail!(
5114 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
5115 );
5116 }
5117
5118 let mut replicas = vec![];
5119 for ReplicaDefinition { name, options } in replica_defs {
5120 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
5121 }
5122
5123 Ok(CreateClusterPlan {
5124 name: normalize::ident(name),
5125 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
5126 workload_class,
5127 })
5128 }
5129}
5130
5131pub fn unplan_create_cluster(
5135 scx: &StatementContext,
5136 CreateClusterPlan {
5137 name,
5138 variant,
5139 workload_class,
5140 }: CreateClusterPlan,
5141) -> Result<CreateClusterStatement<Aug>, PlanError> {
5142 match variant {
5143 CreateClusterVariant::Managed(CreateClusterManagedPlan {
5144 replication_factor,
5145 size,
5146 availability_zones,
5147 compute,
5148 optimizer_feature_overrides,
5149 schedule,
5150 }) => {
5151 let schedule = unplan_cluster_schedule(schedule);
5152 let OptimizerFeatureOverrides {
5153 enable_guard_subquery_tablefunc: _,
5154 enable_consolidate_after_union_negate: _,
5155 enable_reduce_mfp_fusion: _,
5156 enable_cardinality_estimates: _,
5157 persist_fast_path_limit: _,
5158 reoptimize_imported_views,
5159 enable_eager_delta_joins,
5160 enable_new_outer_join_lowering,
5161 enable_variadic_left_join_lowering,
5162 enable_letrec_fixpoint_analysis,
5163 enable_join_prioritize_arranged,
5164 enable_projection_pushdown_after_relation_cse,
5165 enable_less_reduce_in_eqprop: _,
5166 enable_dequadratic_eqprop_map: _,
5167 enable_eq_classes_withholding_errors: _,
5168 enable_fast_path_plan_insights: _,
5169 enable_cast_elimination: _,
5170 enable_case_literal_transform: _,
5171 enable_simplify_quantified_comparisons: _,
5172 enable_coalesce_case_transform: _,
5173 } = optimizer_feature_overrides;
5174 let features_extracted = ClusterFeatureExtracted {
5176 seen: Default::default(),
5178 reoptimize_imported_views,
5179 enable_eager_delta_joins,
5180 enable_new_outer_join_lowering,
5181 enable_variadic_left_join_lowering,
5182 enable_letrec_fixpoint_analysis,
5183 enable_join_prioritize_arranged,
5184 enable_projection_pushdown_after_relation_cse,
5185 };
5186 let features = features_extracted.into_values(scx.catalog);
5187 let availability_zones = if availability_zones.is_empty() {
5188 None
5189 } else {
5190 Some(availability_zones)
5191 };
5192 let (introspection_interval, introspection_debugging) =
5193 unplan_compute_replica_config(compute);
5194 let replication_factor = match &schedule {
5197 ClusterScheduleOptionValue::Manual => Some(replication_factor),
5198 ClusterScheduleOptionValue::Refresh { .. } => {
5199 assert!(
5200 replication_factor <= 1,
5201 "replication factor, {replication_factor:?}, must be <= 1"
5202 );
5203 None
5204 }
5205 };
5206 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5207 let options_extracted = ClusterOptionExtracted {
5208 seen: Default::default(),
5210 availability_zones,
5211 disk: None,
5212 introspection_debugging: Some(introspection_debugging),
5213 introspection_interval,
5214 managed: Some(true),
5215 replicas: None,
5216 replication_factor,
5217 size: Some(size),
5218 schedule: Some(schedule),
5219 workload_class,
5220 };
5221 let options = options_extracted.into_values(scx.catalog);
5222 let name = Ident::new_unchecked(name);
5223 Ok(CreateClusterStatement {
5224 name,
5225 options,
5226 features,
5227 })
5228 }
5229 CreateClusterVariant::Unmanaged(_) => {
5230 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5231 }
5232 }
5233}
5234
5235generate_extracted_config!(
5236 ReplicaOption,
5237 (AvailabilityZone, String),
5238 (BilledAs, String),
5239 (ComputeAddresses, Vec<String>),
5240 (ComputectlAddresses, Vec<String>),
5241 (Disk, bool),
5242 (Internal, bool, Default(false)),
5243 (IntrospectionDebugging, bool, Default(false)),
5244 (IntrospectionInterval, OptionalDuration),
5245 (Size, String),
5246 (StorageAddresses, Vec<String>),
5247 (StoragectlAddresses, Vec<String>),
5248 (Workers, u16)
5249);
5250
5251fn plan_replica_config(
5252 scx: &StatementContext,
5253 options: Vec<ReplicaOption<Aug>>,
5254) -> Result<ReplicaConfig, PlanError> {
5255 let ReplicaOptionExtracted {
5256 availability_zone,
5257 billed_as,
5258 computectl_addresses,
5259 disk,
5260 internal,
5261 introspection_debugging,
5262 introspection_interval,
5263 size,
5264 storagectl_addresses,
5265 ..
5266 }: ReplicaOptionExtracted = options.try_into()?;
5267
5268 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5269
5270 match (
5271 size,
5272 availability_zone,
5273 billed_as,
5274 storagectl_addresses,
5275 computectl_addresses,
5276 ) {
5277 (None, _, None, None, None) => {
5279 sql_bail!("SIZE option must be specified");
5282 }
5283 (Some(size), availability_zone, billed_as, None, None) => {
5284 if disk.is_some() {
5285 if scx.catalog.is_cluster_size_cc(&size) {
5289 sql_bail!(
5290 "DISK option not supported for modern cluster sizes because disk is always enabled"
5291 );
5292 }
5293
5294 scx.catalog
5295 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5296 }
5297
5298 Ok(ReplicaConfig::Orchestrated {
5299 size,
5300 availability_zone,
5301 compute,
5302 billed_as,
5303 internal,
5304 })
5305 }
5306
5307 (None, None, None, storagectl_addresses, computectl_addresses) => {
5308 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5309
5310 let Some(storagectl_addrs) = storagectl_addresses else {
5314 sql_bail!("missing STORAGECTL ADDRESSES option");
5315 };
5316 let Some(computectl_addrs) = computectl_addresses else {
5317 sql_bail!("missing COMPUTECTL ADDRESSES option");
5318 };
5319
5320 if storagectl_addrs.len() != computectl_addrs.len() {
5321 sql_bail!(
5322 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5323 );
5324 }
5325
5326 if disk.is_some() {
5327 sql_bail!("DISK can't be specified for unorchestrated clusters");
5328 }
5329
5330 Ok(ReplicaConfig::Unorchestrated {
5331 storagectl_addrs,
5332 computectl_addrs,
5333 compute,
5334 })
5335 }
5336 _ => {
5337 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5340 }
5341 }
5342}
5343
5344fn plan_compute_replica_config(
5348 introspection_interval: Option<OptionalDuration>,
5349 introspection_debugging: bool,
5350) -> Result<ComputeReplicaConfig, PlanError> {
5351 let introspection_interval = introspection_interval
5352 .map(|OptionalDuration(i)| i)
5353 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5354 let introspection = match introspection_interval {
5355 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5356 interval,
5357 debugging: introspection_debugging,
5358 }),
5359 None if introspection_debugging => {
5360 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5361 }
5362 None => None,
5363 };
5364 let compute = ComputeReplicaConfig { introspection };
5365 Ok(compute)
5366}
5367
5368fn unplan_compute_replica_config(
5372 compute_replica_config: ComputeReplicaConfig,
5373) -> (Option<OptionalDuration>, bool) {
5374 match compute_replica_config.introspection {
5375 Some(ComputeReplicaIntrospectionConfig {
5376 debugging,
5377 interval,
5378 }) => (Some(OptionalDuration(Some(interval))), debugging),
5379 None => (Some(OptionalDuration(None)), false),
5380 }
5381}
5382
5383fn plan_cluster_schedule(
5387 schedule: ClusterScheduleOptionValue,
5388) -> Result<ClusterSchedule, PlanError> {
5389 Ok(match schedule {
5390 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5391 ClusterScheduleOptionValue::Refresh {
5393 hydration_time_estimate: None,
5394 } => ClusterSchedule::Refresh {
5395 hydration_time_estimate: Duration::from_millis(0),
5396 },
5397 ClusterScheduleOptionValue::Refresh {
5399 hydration_time_estimate: Some(interval_value),
5400 } => {
5401 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5402 if interval.as_microseconds() < 0 {
5403 sql_bail!(
5404 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5405 interval
5406 );
5407 }
5408 if interval.months != 0 {
5409 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5413 }
5414 let duration = interval.duration()?;
5415 if u64::try_from(duration.as_millis()).is_err()
5416 || Interval::from_duration(&duration).is_err()
5417 {
5418 sql_bail!("HYDRATION TIME ESTIMATE too large");
5419 }
5420 ClusterSchedule::Refresh {
5421 hydration_time_estimate: duration,
5422 }
5423 }
5424 })
5425}
5426
5427fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5431 match schedule {
5432 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5433 ClusterSchedule::Refresh {
5434 hydration_time_estimate,
5435 } => {
5436 let interval = Interval::from_duration(&hydration_time_estimate)
5437 .expect("planning ensured that this is convertible back to Interval");
5438 let interval_value = literal::unplan_interval(&interval);
5439 ClusterScheduleOptionValue::Refresh {
5440 hydration_time_estimate: Some(interval_value),
5441 }
5442 }
5443 }
5444}
5445
5446pub fn describe_create_cluster_replica(
5447 _: &StatementContext,
5448 _: CreateClusterReplicaStatement<Aug>,
5449) -> Result<StatementDesc, PlanError> {
5450 Ok(StatementDesc::new(None))
5451}
5452
5453pub fn plan_create_cluster_replica(
5454 scx: &StatementContext,
5455 CreateClusterReplicaStatement {
5456 definition: ReplicaDefinition { name, options },
5457 of_cluster,
5458 }: CreateClusterReplicaStatement<Aug>,
5459) -> Result<Plan, PlanError> {
5460 let cluster = scx
5461 .catalog
5462 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5463 let current_replica_count = cluster.replica_ids().iter().count();
5464 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5465 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5466 return Err(PlanError::CreateReplicaFailStorageObjects {
5467 current_replica_count,
5468 internal_replica_count,
5469 hypothetical_replica_count: current_replica_count + 1,
5470 });
5471 }
5472
5473 let config = plan_replica_config(scx, options)?;
5474
5475 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5476 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5477 return Err(PlanError::MangedReplicaName(name.into_string()));
5478 }
5479 } else {
5480 ensure_cluster_is_not_managed(scx, cluster.id())?;
5481 }
5482
5483 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5484 name: normalize::ident(name),
5485 cluster_id: cluster.id(),
5486 config,
5487 }))
5488}
5489
5490pub fn describe_create_secret(
5491 _: &StatementContext,
5492 _: CreateSecretStatement<Aug>,
5493) -> Result<StatementDesc, PlanError> {
5494 Ok(StatementDesc::new(None))
5495}
5496
5497pub fn plan_create_secret(
5498 scx: &StatementContext,
5499 stmt: CreateSecretStatement<Aug>,
5500) -> Result<Plan, PlanError> {
5501 let CreateSecretStatement {
5502 name,
5503 if_not_exists,
5504 value,
5505 } = &stmt;
5506
5507 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5508 let mut create_sql_statement = stmt.clone();
5509 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5510 let create_sql =
5511 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5512 let secret_as = query::plan_secret_as(scx, value.clone())?;
5513
5514 let secret = Secret {
5515 create_sql,
5516 secret_as,
5517 };
5518
5519 Ok(Plan::CreateSecret(CreateSecretPlan {
5520 name,
5521 secret,
5522 if_not_exists: *if_not_exists,
5523 }))
5524}
5525
5526pub fn describe_create_connection(
5527 _: &StatementContext,
5528 _: CreateConnectionStatement<Aug>,
5529) -> Result<StatementDesc, PlanError> {
5530 Ok(StatementDesc::new(None))
5531}
5532
5533generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5534
5535pub fn plan_create_connection(
5536 scx: &StatementContext,
5537 mut stmt: CreateConnectionStatement<Aug>,
5538) -> Result<Plan, PlanError> {
5539 let CreateConnectionStatement {
5540 name,
5541 connection_type,
5542 values,
5543 if_not_exists,
5544 with_options,
5545 } = stmt.clone();
5546 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5547 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5548 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5549
5550 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5551 if options.validate.is_some() {
5552 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5553 }
5554 let validate = match options.validate {
5555 Some(val) => val,
5556 None => {
5557 scx.catalog
5558 .system_vars()
5559 .enable_default_connection_validation()
5560 && details.to_connection().validate_by_default()
5561 }
5562 };
5563
5564 let full_name = scx.catalog.resolve_full_name(&name);
5566 let partial_name = PartialItemName::from(full_name.clone());
5567 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5568 return Err(PlanError::ItemAlreadyExists {
5569 name: full_name.to_string(),
5570 item_type: item.item_type(),
5571 });
5572 }
5573
5574 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5577 stmt.values.retain(|v| {
5578 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5579 });
5580 stmt.values.push(ConnectionOption {
5581 name: ConnectionOptionName::PublicKey1,
5582 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5583 });
5584 stmt.values.push(ConnectionOption {
5585 name: ConnectionOptionName::PublicKey2,
5586 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5587 });
5588 }
5589 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5590
5591 let plan = CreateConnectionPlan {
5592 name,
5593 if_not_exists,
5594 connection: crate::plan::Connection {
5595 create_sql,
5596 details,
5597 },
5598 validate,
5599 };
5600 Ok(Plan::CreateConnection(plan))
5601}
5602
5603fn plan_drop_database(
5604 scx: &StatementContext,
5605 if_exists: bool,
5606 name: &UnresolvedDatabaseName,
5607 cascade: bool,
5608) -> Result<Option<DatabaseId>, PlanError> {
5609 Ok(match resolve_database(scx, name, if_exists)? {
5610 Some(database) => {
5611 if !cascade && database.has_schemas() {
5612 sql_bail!(
5613 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5614 name,
5615 );
5616 }
5617 Some(database.id())
5618 }
5619 None => None,
5620 })
5621}
5622
5623pub fn describe_drop_objects(
5624 _: &StatementContext,
5625 _: DropObjectsStatement,
5626) -> Result<StatementDesc, PlanError> {
5627 Ok(StatementDesc::new(None))
5628}
5629
5630pub fn plan_drop_objects(
5631 scx: &mut StatementContext,
5632 DropObjectsStatement {
5633 object_type,
5634 if_exists,
5635 names,
5636 cascade,
5637 }: DropObjectsStatement,
5638) -> Result<Plan, PlanError> {
5639 assert_ne!(
5640 object_type,
5641 mz_sql_parser::ast::ObjectType::Func,
5642 "rejected in parser"
5643 );
5644 let object_type = object_type.into();
5645
5646 let mut referenced_ids = Vec::new();
5647 for name in names {
5648 let id = match &name {
5649 UnresolvedObjectName::Cluster(name) => {
5650 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5651 }
5652 UnresolvedObjectName::ClusterReplica(name) => {
5653 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5654 }
5655 UnresolvedObjectName::Database(name) => {
5656 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5657 }
5658 UnresolvedObjectName::Schema(name) => {
5659 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5660 }
5661 UnresolvedObjectName::Role(name) => {
5662 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5663 }
5664 UnresolvedObjectName::Item(name) => {
5665 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5666 .map(ObjectId::Item)
5667 }
5668 UnresolvedObjectName::NetworkPolicy(name) => {
5669 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5670 }
5671 };
5672 match id {
5673 Some(id) => referenced_ids.push(id),
5674 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5675 name: name.to_ast_string_simple(),
5676 object_type,
5677 }),
5678 }
5679 }
5680 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5681
5682 Ok(Plan::DropObjects(DropObjectsPlan {
5683 referenced_ids,
5684 drop_ids,
5685 object_type,
5686 }))
5687}
5688
5689fn plan_drop_schema(
5690 scx: &StatementContext,
5691 if_exists: bool,
5692 name: &UnresolvedSchemaName,
5693 cascade: bool,
5694) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5695 let normalized = normalize::unresolved_schema_name(name.clone())?;
5699 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5700 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5701 }
5702
5703 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5704 Some((database_spec, schema_spec)) => {
5705 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5706 sql_bail!(
5707 "cannot drop schema {name} because it is required by the database system",
5708 );
5709 }
5710 if let SchemaSpecifier::Temporary = schema_spec {
5711 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5712 }
5713 let schema = scx.get_schema(&database_spec, &schema_spec);
5714 if !cascade && schema.has_items() {
5715 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5716 sql_bail!(
5717 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5718 full_schema_name
5719 );
5720 }
5721 Some((database_spec, schema_spec))
5722 }
5723 None => None,
5724 })
5725}
5726
5727fn plan_drop_role(
5728 scx: &StatementContext,
5729 if_exists: bool,
5730 name: &Ident,
5731) -> Result<Option<RoleId>, PlanError> {
5732 match scx.catalog.resolve_role(name.as_str()) {
5733 Ok(role) => {
5734 let id = role.id();
5735 if &id == scx.catalog.active_role_id() {
5736 sql_bail!("current role cannot be dropped");
5737 }
5738 for role in scx.catalog.get_roles() {
5739 for (member_id, grantor_id) in role.membership() {
5740 if &id == grantor_id {
5741 let member_role = scx.catalog.get_role(member_id);
5742 sql_bail!(
5743 "cannot drop role {}: still depended up by membership of role {} in role {}",
5744 name.as_str(),
5745 role.name(),
5746 member_role.name()
5747 );
5748 }
5749 }
5750 }
5751 Ok(Some(role.id()))
5752 }
5753 Err(_) if if_exists => Ok(None),
5754 Err(e) => Err(e.into()),
5755 }
5756}
5757
5758fn plan_drop_cluster(
5759 scx: &StatementContext,
5760 if_exists: bool,
5761 name: &Ident,
5762 cascade: bool,
5763) -> Result<Option<ClusterId>, PlanError> {
5764 Ok(match resolve_cluster(scx, name, if_exists)? {
5765 Some(cluster) => {
5766 if !cascade && !cluster.bound_objects().is_empty() {
5767 return Err(PlanError::DependentObjectsStillExist {
5768 object_type: "cluster".to_string(),
5769 object_name: cluster.name().to_string(),
5770 dependents: Vec::new(),
5771 });
5772 }
5773 Some(cluster.id())
5774 }
5775 None => None,
5776 })
5777}
5778
5779fn plan_drop_network_policy(
5780 scx: &StatementContext,
5781 if_exists: bool,
5782 name: &Ident,
5783) -> Result<Option<NetworkPolicyId>, PlanError> {
5784 match scx.catalog.resolve_network_policy(name.as_str()) {
5785 Ok(policy) => {
5786 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5789 Err(PlanError::NetworkPolicyInUse)
5790 } else {
5791 Ok(Some(policy.id()))
5792 }
5793 }
5794 Err(_) if if_exists => Ok(None),
5795 Err(e) => Err(e.into()),
5796 }
5797}
5798
5799fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5802 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5804 false
5805 } else {
5806 cluster.bound_objects().iter().any(|id| {
5808 let item = scx.catalog.get_item(id);
5809 matches!(
5810 item.item_type(),
5811 CatalogItemType::Sink | CatalogItemType::Source
5812 )
5813 })
5814 }
5815}
5816
5817fn plan_drop_cluster_replica(
5818 scx: &StatementContext,
5819 if_exists: bool,
5820 name: &QualifiedReplica,
5821) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5822 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5823 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5824}
5825
5826fn plan_drop_item(
5828 scx: &StatementContext,
5829 object_type: ObjectType,
5830 if_exists: bool,
5831 name: UnresolvedItemName,
5832 cascade: bool,
5833) -> Result<Option<CatalogItemId>, PlanError> {
5834 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5835 Ok(r) => r,
5836 Err(PlanError::MismatchedObjectType {
5838 name,
5839 is_type: ObjectType::MaterializedView,
5840 expected_type: ObjectType::View,
5841 }) => {
5842 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5843 }
5844 e => e?,
5845 };
5846
5847 Ok(match resolved {
5848 Some(catalog_item) => {
5849 if catalog_item.id().is_system() {
5850 sql_bail!(
5851 "cannot drop {} {} because it is required by the database system",
5852 catalog_item.item_type(),
5853 scx.catalog.minimal_qualification(catalog_item.name()),
5854 );
5855 }
5856
5857 if !cascade {
5858 for id in catalog_item.used_by() {
5859 let dep = scx.catalog.get_item(id);
5860 if dependency_prevents_drop(object_type, dep) {
5861 return Err(PlanError::DependentObjectsStillExist {
5862 object_type: catalog_item.item_type().to_string(),
5863 object_name: scx
5864 .catalog
5865 .minimal_qualification(catalog_item.name())
5866 .to_string(),
5867 dependents: vec![(
5868 dep.item_type().to_string(),
5869 scx.catalog.minimal_qualification(dep.name()).to_string(),
5870 )],
5871 });
5872 }
5873 }
5874 }
5877 Some(catalog_item.id())
5878 }
5879 None => None,
5880 })
5881}
5882
5883fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5885 match object_type {
5886 ObjectType::Type => true,
5887 ObjectType::Table
5888 | ObjectType::View
5889 | ObjectType::MaterializedView
5890 | ObjectType::Source
5891 | ObjectType::Sink
5892 | ObjectType::Index
5893 | ObjectType::Role
5894 | ObjectType::Cluster
5895 | ObjectType::ClusterReplica
5896 | ObjectType::Secret
5897 | ObjectType::Connection
5898 | ObjectType::Database
5899 | ObjectType::Schema
5900 | ObjectType::Func
5901 | ObjectType::ContinualTask
5902 | ObjectType::NetworkPolicy => match dep.item_type() {
5903 CatalogItemType::Func
5904 | CatalogItemType::Table
5905 | CatalogItemType::Source
5906 | CatalogItemType::View
5907 | CatalogItemType::MaterializedView
5908 | CatalogItemType::Sink
5909 | CatalogItemType::Type
5910 | CatalogItemType::Secret
5911 | CatalogItemType::Connection
5912 | CatalogItemType::ContinualTask => true,
5913 CatalogItemType::Index => false,
5914 },
5915 }
5916}
5917
5918pub fn describe_alter_index_options(
5919 _: &StatementContext,
5920 _: AlterIndexStatement<Aug>,
5921) -> Result<StatementDesc, PlanError> {
5922 Ok(StatementDesc::new(None))
5923}
5924
5925pub fn describe_drop_owned(
5926 _: &StatementContext,
5927 _: DropOwnedStatement<Aug>,
5928) -> Result<StatementDesc, PlanError> {
5929 Ok(StatementDesc::new(None))
5930}
5931
5932pub fn plan_drop_owned(
5933 scx: &StatementContext,
5934 drop: DropOwnedStatement<Aug>,
5935) -> Result<Plan, PlanError> {
5936 let cascade = drop.cascade();
5937 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5938 let mut drop_ids = Vec::new();
5939 let mut privilege_revokes = Vec::new();
5940 let mut default_privilege_revokes = Vec::new();
5941
5942 fn update_privilege_revokes(
5943 object_id: SystemObjectId,
5944 privileges: &PrivilegeMap,
5945 role_ids: &BTreeSet<RoleId>,
5946 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5947 ) {
5948 privilege_revokes.extend(iter::zip(
5949 iter::repeat(object_id),
5950 privileges
5951 .all_values()
5952 .filter(|privilege| role_ids.contains(&privilege.grantee))
5953 .cloned(),
5954 ));
5955 }
5956
5957 for replica in scx.catalog.get_cluster_replicas() {
5959 if role_ids.contains(&replica.owner_id()) {
5960 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5961 }
5962 }
5963
5964 for cluster in scx.catalog.get_clusters() {
5966 if role_ids.contains(&cluster.owner_id()) {
5967 if !cascade {
5969 let non_owned_bound_objects: Vec<_> = cluster
5970 .bound_objects()
5971 .into_iter()
5972 .map(|item_id| scx.catalog.get_item(item_id))
5973 .filter(|item| !role_ids.contains(&item.owner_id()))
5974 .collect();
5975 if !non_owned_bound_objects.is_empty() {
5976 let names: Vec<_> = non_owned_bound_objects
5977 .into_iter()
5978 .map(|item| {
5979 (
5980 item.item_type().to_string(),
5981 scx.catalog.resolve_full_name(item.name()).to_string(),
5982 )
5983 })
5984 .collect();
5985 return Err(PlanError::DependentObjectsStillExist {
5986 object_type: "cluster".to_string(),
5987 object_name: cluster.name().to_string(),
5988 dependents: names,
5989 });
5990 }
5991 }
5992 drop_ids.push(cluster.id().into());
5993 }
5994 update_privilege_revokes(
5995 SystemObjectId::Object(cluster.id().into()),
5996 cluster.privileges(),
5997 &role_ids,
5998 &mut privilege_revokes,
5999 );
6000 }
6001
6002 for item in scx.catalog.get_items() {
6004 if role_ids.contains(&item.owner_id()) {
6005 if !cascade {
6006 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
6008 let non_owned_dependencies: Vec<_> = used_by
6009 .into_iter()
6010 .map(|item_id| scx.catalog.get_item(item_id))
6011 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
6012 .filter(|item| !role_ids.contains(&item.owner_id()))
6013 .collect();
6014 if !non_owned_dependencies.is_empty() {
6015 let names: Vec<_> = non_owned_dependencies
6016 .into_iter()
6017 .map(|item| {
6018 let item_typ = item.item_type().to_string();
6019 let item_name =
6020 scx.catalog.resolve_full_name(item.name()).to_string();
6021 (item_typ, item_name)
6022 })
6023 .collect();
6024 Err(PlanError::DependentObjectsStillExist {
6025 object_type: item.item_type().to_string(),
6026 object_name: scx
6027 .catalog
6028 .resolve_full_name(item.name())
6029 .to_string()
6030 .to_string(),
6031 dependents: names,
6032 })
6033 } else {
6034 Ok(())
6035 }
6036 };
6037
6038 if let Some(id) = item.progress_id() {
6041 let progress_item = scx.catalog.get_item(&id);
6042 check_if_dependents_exist(progress_item.used_by())?;
6043 }
6044 check_if_dependents_exist(item.used_by())?;
6045 }
6046 drop_ids.push(item.id().into());
6047 }
6048 update_privilege_revokes(
6049 SystemObjectId::Object(item.id().into()),
6050 item.privileges(),
6051 &role_ids,
6052 &mut privilege_revokes,
6053 );
6054 }
6055
6056 for schema in scx.catalog.get_schemas() {
6058 if !schema.id().is_temporary() {
6059 if role_ids.contains(&schema.owner_id()) {
6060 if !cascade {
6061 let non_owned_dependencies: Vec<_> = schema
6062 .item_ids()
6063 .map(|item_id| scx.catalog.get_item(&item_id))
6064 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
6065 .filter(|item| !role_ids.contains(&item.owner_id()))
6066 .collect();
6067 if !non_owned_dependencies.is_empty() {
6068 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
6069 sql_bail!(
6070 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
6071 full_schema_name.to_string().quoted()
6072 );
6073 }
6074 }
6075 drop_ids.push((*schema.database(), *schema.id()).into())
6076 }
6077 update_privilege_revokes(
6078 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
6079 schema.privileges(),
6080 &role_ids,
6081 &mut privilege_revokes,
6082 );
6083 }
6084 }
6085
6086 for database in scx.catalog.get_databases() {
6088 if role_ids.contains(&database.owner_id()) {
6089 if !cascade {
6090 let non_owned_schemas: Vec<_> = database
6091 .schemas()
6092 .into_iter()
6093 .filter(|schema| !role_ids.contains(&schema.owner_id()))
6094 .collect();
6095 if !non_owned_schemas.is_empty() {
6096 sql_bail!(
6097 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
6098 database.name().quoted(),
6099 );
6100 }
6101 }
6102 drop_ids.push(database.id().into());
6103 }
6104 update_privilege_revokes(
6105 SystemObjectId::Object(database.id().into()),
6106 database.privileges(),
6107 &role_ids,
6108 &mut privilege_revokes,
6109 );
6110 }
6111
6112 for network_policy in scx.catalog.get_network_policies() {
6114 if role_ids.contains(&network_policy.owner_id()) {
6115 drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
6116 }
6117 update_privilege_revokes(
6118 SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
6119 network_policy.privileges(),
6120 &role_ids,
6121 &mut privilege_revokes,
6122 );
6123 }
6124
6125 update_privilege_revokes(
6127 SystemObjectId::System,
6128 scx.catalog.get_system_privileges(),
6129 &role_ids,
6130 &mut privilege_revokes,
6131 );
6132
6133 for (default_privilege_object, default_privilege_acl_items) in
6134 scx.catalog.get_default_privileges()
6135 {
6136 for default_privilege_acl_item in default_privilege_acl_items {
6137 if role_ids.contains(&default_privilege_object.role_id)
6138 || role_ids.contains(&default_privilege_acl_item.grantee)
6139 {
6140 default_privilege_revokes.push((
6141 default_privilege_object.clone(),
6142 default_privilege_acl_item.clone(),
6143 ));
6144 }
6145 }
6146 }
6147
6148 let drop_ids = scx.catalog.object_dependents(&drop_ids);
6149
6150 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
6151 if !system_ids.is_empty() {
6152 let mut owners = system_ids
6153 .into_iter()
6154 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
6155 .collect::<BTreeSet<_>>()
6156 .into_iter()
6157 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
6158 sql_bail!(
6159 "cannot drop objects owned by role {} because they are required by the database system",
6160 owners.join(", "),
6161 );
6162 }
6163
6164 Ok(Plan::DropOwned(DropOwnedPlan {
6165 role_ids: role_ids.into_iter().collect(),
6166 drop_ids,
6167 privilege_revokes,
6168 default_privilege_revokes,
6169 }))
6170}
6171
6172fn plan_retain_history_option(
6173 scx: &StatementContext,
6174 retain_history: Option<OptionalDuration>,
6175) -> Result<Option<CompactionWindow>, PlanError> {
6176 if let Some(OptionalDuration(lcw)) = retain_history {
6177 Ok(Some(plan_retain_history(scx, lcw)?))
6178 } else {
6179 Ok(None)
6180 }
6181}
6182
6183fn plan_retain_history(
6189 scx: &StatementContext,
6190 lcw: Option<Duration>,
6191) -> Result<CompactionWindow, PlanError> {
6192 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6193 match lcw {
6194 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6199 option_name: "RETAIN HISTORY".to_string(),
6200 err: Box::new(PlanError::Unstructured(
6201 "internal error: unexpectedly zero".to_string(),
6202 )),
6203 }),
6204 Some(duration) => {
6205 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6208 && scx
6209 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6210 .is_err()
6211 {
6212 return Err(PlanError::RetainHistoryLow {
6213 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6214 });
6215 }
6216 Ok(duration.try_into()?)
6217 }
6218 None => {
6221 if scx
6222 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6223 .is_err()
6224 {
6225 Err(PlanError::RetainHistoryRequired)
6226 } else {
6227 Ok(CompactionWindow::DisableCompaction)
6228 }
6229 }
6230 }
6231}
6232
6233generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6234
6235fn plan_index_options(
6236 scx: &StatementContext,
6237 with_opts: Vec<IndexOption<Aug>>,
6238) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6239 if !with_opts.is_empty() {
6240 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6242 }
6243
6244 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6245
6246 let mut out = Vec::with_capacity(1);
6247 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6248 out.push(crate::plan::IndexOption::RetainHistory(cw));
6249 }
6250 Ok(out)
6251}
6252
6253generate_extracted_config!(
6254 TableOption,
6255 (PartitionBy, Vec<Ident>),
6256 (RetainHistory, OptionalDuration),
6257 (RedactedTest, String)
6258);
6259
6260fn plan_table_options(
6261 scx: &StatementContext,
6262 desc: &RelationDesc,
6263 with_opts: Vec<TableOption<Aug>>,
6264) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6265 let TableOptionExtracted {
6266 partition_by,
6267 retain_history,
6268 redacted_test,
6269 ..
6270 }: TableOptionExtracted = with_opts.try_into()?;
6271
6272 if let Some(partition_by) = partition_by {
6273 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6274 check_partition_by(desc, partition_by)?;
6275 }
6276
6277 if redacted_test.is_some() {
6278 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6279 }
6280
6281 let mut out = Vec::with_capacity(1);
6282 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6283 out.push(crate::plan::TableOption::RetainHistory(cw));
6284 }
6285 Ok(out)
6286}
6287
6288pub fn plan_alter_index_options(
6289 scx: &mut StatementContext,
6290 AlterIndexStatement {
6291 index_name,
6292 if_exists,
6293 action,
6294 }: AlterIndexStatement<Aug>,
6295) -> Result<Plan, PlanError> {
6296 let object_type = ObjectType::Index;
6297 match action {
6298 AlterIndexAction::ResetOptions(options) => {
6299 let mut options = options.into_iter();
6300 if let Some(opt) = options.next() {
6301 match opt {
6302 IndexOptionName::RetainHistory => {
6303 if options.next().is_some() {
6304 sql_bail!("RETAIN HISTORY must be only option");
6305 }
6306 return alter_retain_history(
6307 scx,
6308 object_type,
6309 if_exists,
6310 UnresolvedObjectName::Item(index_name),
6311 None,
6312 );
6313 }
6314 }
6315 }
6316 sql_bail!("expected option");
6317 }
6318 AlterIndexAction::SetOptions(options) => {
6319 let mut options = options.into_iter();
6320 if let Some(opt) = options.next() {
6321 match opt.name {
6322 IndexOptionName::RetainHistory => {
6323 if options.next().is_some() {
6324 sql_bail!("RETAIN HISTORY must be only option");
6325 }
6326 return alter_retain_history(
6327 scx,
6328 object_type,
6329 if_exists,
6330 UnresolvedObjectName::Item(index_name),
6331 opt.value,
6332 );
6333 }
6334 }
6335 }
6336 sql_bail!("expected option");
6337 }
6338 }
6339}
6340
6341pub fn describe_alter_cluster_set_options(
6342 _: &StatementContext,
6343 _: AlterClusterStatement<Aug>,
6344) -> Result<StatementDesc, PlanError> {
6345 Ok(StatementDesc::new(None))
6346}
6347
6348pub fn plan_alter_cluster(
6349 scx: &mut StatementContext,
6350 AlterClusterStatement {
6351 name,
6352 action,
6353 if_exists,
6354 }: AlterClusterStatement<Aug>,
6355) -> Result<Plan, PlanError> {
6356 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6357 Some(entry) => entry,
6358 None => {
6359 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6360 name: name.to_ast_string_simple(),
6361 object_type: ObjectType::Cluster,
6362 });
6363
6364 return Ok(Plan::AlterNoop(AlterNoopPlan {
6365 object_type: ObjectType::Cluster,
6366 }));
6367 }
6368 };
6369
6370 let mut options: PlanClusterOption = Default::default();
6371 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6372
6373 match action {
6374 AlterClusterAction::SetOptions {
6375 options: set_options,
6376 with_options,
6377 } => {
6378 let ClusterOptionExtracted {
6379 availability_zones,
6380 introspection_debugging,
6381 introspection_interval,
6382 managed,
6383 replicas: replica_defs,
6384 replication_factor,
6385 seen: _,
6386 size,
6387 disk,
6388 schedule,
6389 workload_class,
6390 }: ClusterOptionExtracted = set_options.try_into()?;
6391
6392 if !scx.catalog.active_role_id().is_system() {
6393 if workload_class.is_some() {
6394 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6395 }
6396 }
6397
6398 match managed.unwrap_or_else(|| cluster.is_managed()) {
6399 true => {
6400 let alter_strategy_extracted =
6401 ClusterAlterOptionExtracted::try_from(with_options)?;
6402 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6403
6404 match alter_strategy {
6405 AlterClusterPlanStrategy::None => {}
6406 _ => {
6407 scx.require_feature_flag(
6408 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6409 )?;
6410 }
6411 }
6412
6413 if replica_defs.is_some() {
6414 sql_bail!("REPLICAS not supported for managed clusters");
6415 }
6416 if schedule.is_some()
6417 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6418 {
6419 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6420 }
6421
6422 if let Some(replication_factor) = replication_factor {
6423 if schedule.is_some()
6424 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6425 {
6426 sql_bail!(
6427 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6428 );
6429 }
6430 if let Some(current_schedule) = cluster.schedule() {
6431 if !matches!(current_schedule, ClusterSchedule::Manual) {
6432 sql_bail!(
6433 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6434 );
6435 }
6436 }
6437
6438 let internal_replica_count =
6439 cluster.replicas().iter().filter(|r| r.internal()).count();
6440 let hypothetical_replica_count =
6441 internal_replica_count + usize::cast_from(replication_factor);
6442
6443 if contains_single_replica_objects(scx, cluster)
6446 && hypothetical_replica_count > 1
6447 {
6448 return Err(PlanError::CreateReplicaFailStorageObjects {
6449 current_replica_count: cluster.replica_ids().iter().count(),
6450 internal_replica_count,
6451 hypothetical_replica_count,
6452 });
6453 }
6454 } else if alter_strategy.is_some() {
6455 let internal_replica_count =
6459 cluster.replicas().iter().filter(|r| r.internal()).count();
6460 let hypothetical_replica_count = internal_replica_count * 2;
6461 if contains_single_replica_objects(scx, cluster) {
6462 return Err(PlanError::CreateReplicaFailStorageObjects {
6463 current_replica_count: cluster.replica_ids().iter().count(),
6464 internal_replica_count,
6465 hypothetical_replica_count,
6466 });
6467 }
6468 }
6469 }
6470 false => {
6471 if !alter_strategy.is_none() {
6472 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6473 }
6474 if availability_zones.is_some() {
6475 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6476 }
6477 if replication_factor.is_some() {
6478 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6479 }
6480 if introspection_debugging.is_some() {
6481 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6482 }
6483 if introspection_interval.is_some() {
6484 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6485 }
6486 if size.is_some() {
6487 sql_bail!("SIZE not supported for unmanaged clusters");
6488 }
6489 if disk.is_some() {
6490 sql_bail!("DISK not supported for unmanaged clusters");
6491 }
6492 if schedule.is_some()
6493 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6494 {
6495 sql_bail!(
6496 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6497 );
6498 }
6499 if let Some(current_schedule) = cluster.schedule() {
6500 if !matches!(current_schedule, ClusterSchedule::Manual)
6501 && schedule.is_none()
6502 {
6503 sql_bail!(
6504 "when switching a cluster to unmanaged, if the managed \
6505 cluster's SCHEDULE is anything other than MANUAL, you have to \
6506 explicitly set the SCHEDULE to MANUAL"
6507 );
6508 }
6509 }
6510 }
6511 }
6512
6513 let mut replicas = vec![];
6514 for ReplicaDefinition { name, options } in
6515 replica_defs.into_iter().flat_map(Vec::into_iter)
6516 {
6517 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6518 }
6519
6520 if let Some(managed) = managed {
6521 options.managed = AlterOptionParameter::Set(managed);
6522 }
6523 if let Some(replication_factor) = replication_factor {
6524 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6525 }
6526 if let Some(size) = &size {
6527 options.size = AlterOptionParameter::Set(size.clone());
6528 }
6529 if let Some(availability_zones) = availability_zones {
6530 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6531 }
6532 if let Some(introspection_debugging) = introspection_debugging {
6533 options.introspection_debugging =
6534 AlterOptionParameter::Set(introspection_debugging);
6535 }
6536 if let Some(introspection_interval) = introspection_interval {
6537 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6538 }
6539 if disk.is_some() {
6540 let size = size.as_deref().unwrap_or_else(|| {
6544 cluster.managed_size().expect("cluster known to be managed")
6545 });
6546 if scx.catalog.is_cluster_size_cc(size) {
6547 sql_bail!(
6548 "DISK option not supported for modern cluster sizes because disk is always enabled"
6549 );
6550 }
6551
6552 scx.catalog
6553 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6554 }
6555 if !replicas.is_empty() {
6556 options.replicas = AlterOptionParameter::Set(replicas);
6557 }
6558 if let Some(schedule) = schedule {
6559 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6560 }
6561 if let Some(workload_class) = workload_class {
6562 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6563 }
6564 }
6565 AlterClusterAction::ResetOptions(reset_options) => {
6566 use AlterOptionParameter::Reset;
6567 use ClusterOptionName::*;
6568
6569 if !scx.catalog.active_role_id().is_system() {
6570 if reset_options.contains(&WorkloadClass) {
6571 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6572 }
6573 }
6574
6575 for option in reset_options {
6576 match option {
6577 AvailabilityZones => options.availability_zones = Reset,
6578 Disk => scx
6579 .catalog
6580 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6581 IntrospectionInterval => options.introspection_interval = Reset,
6582 IntrospectionDebugging => options.introspection_debugging = Reset,
6583 Managed => options.managed = Reset,
6584 Replicas => options.replicas = Reset,
6585 ReplicationFactor => options.replication_factor = Reset,
6586 Size => options.size = Reset,
6587 Schedule => options.schedule = Reset,
6588 WorkloadClass => options.workload_class = Reset,
6589 }
6590 }
6591 }
6592 }
6593 Ok(Plan::AlterCluster(AlterClusterPlan {
6594 id: cluster.id(),
6595 name: cluster.name().to_string(),
6596 options,
6597 strategy: alter_strategy,
6598 }))
6599}
6600
6601pub fn describe_alter_set_cluster(
6602 _: &StatementContext,
6603 _: AlterSetClusterStatement<Aug>,
6604) -> Result<StatementDesc, PlanError> {
6605 Ok(StatementDesc::new(None))
6606}
6607
6608pub fn plan_alter_item_set_cluster(
6609 scx: &StatementContext,
6610 AlterSetClusterStatement {
6611 if_exists,
6612 set_cluster: in_cluster_name,
6613 name,
6614 object_type,
6615 }: AlterSetClusterStatement<Aug>,
6616) -> Result<Plan, PlanError> {
6617 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6618
6619 let object_type = object_type.into();
6620
6621 match object_type {
6623 ObjectType::MaterializedView => {}
6624 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6625 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6626 }
6627 ObjectType::Table
6628 | ObjectType::View
6629 | ObjectType::Type
6630 | ObjectType::Role
6631 | ObjectType::Cluster
6632 | ObjectType::ClusterReplica
6633 | ObjectType::Secret
6634 | ObjectType::Connection
6635 | ObjectType::Database
6636 | ObjectType::Schema
6637 | ObjectType::Func
6638 | ObjectType::ContinualTask
6639 | ObjectType::NetworkPolicy => {
6640 bail_never_supported!(
6641 format!("ALTER {object_type} SET CLUSTER"),
6642 "sql/alter-set-cluster/",
6643 format!("{object_type} has no associated cluster")
6644 )
6645 }
6646 }
6647
6648 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6649
6650 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6651 Some(entry) => {
6652 let current_cluster = entry.cluster_id();
6653 let Some(current_cluster) = current_cluster else {
6654 sql_bail!("No cluster associated with {name}");
6655 };
6656
6657 if current_cluster == in_cluster.id() {
6658 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6659 } else {
6660 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6661 id: entry.id(),
6662 set_cluster: in_cluster.id(),
6663 }))
6664 }
6665 }
6666 None => {
6667 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6668 name: name.to_ast_string_simple(),
6669 object_type,
6670 });
6671
6672 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6673 }
6674 }
6675}
6676
6677pub fn describe_alter_object_rename(
6678 _: &StatementContext,
6679 _: AlterObjectRenameStatement,
6680) -> Result<StatementDesc, PlanError> {
6681 Ok(StatementDesc::new(None))
6682}
6683
6684pub fn plan_alter_object_rename(
6685 scx: &mut StatementContext,
6686 AlterObjectRenameStatement {
6687 name,
6688 object_type,
6689 to_item_name,
6690 if_exists,
6691 }: AlterObjectRenameStatement,
6692) -> Result<Plan, PlanError> {
6693 let object_type = object_type.into();
6694 match (object_type, name) {
6695 (
6696 ObjectType::View
6697 | ObjectType::MaterializedView
6698 | ObjectType::Table
6699 | ObjectType::Source
6700 | ObjectType::Index
6701 | ObjectType::Sink
6702 | ObjectType::Secret
6703 | ObjectType::Connection,
6704 UnresolvedObjectName::Item(name),
6705 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6706 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6707 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6708 }
6709 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6710 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6711 }
6712 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6713 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6714 }
6715 (object_type, name) => {
6716 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6717 }
6718 }
6719}
6720
6721pub fn plan_alter_schema_rename(
6722 scx: &mut StatementContext,
6723 name: UnresolvedSchemaName,
6724 to_schema_name: Ident,
6725 if_exists: bool,
6726) -> Result<Plan, PlanError> {
6727 let normalized = normalize::unresolved_schema_name(name.clone())?;
6731 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6732 sql_bail!(
6733 "cannot rename schemas in the ambient database: {:?}",
6734 mz_repr::namespaces::MZ_TEMP_SCHEMA
6735 );
6736 }
6737
6738 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6739 let object_type = ObjectType::Schema;
6740 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6741 name: name.to_ast_string_simple(),
6742 object_type,
6743 });
6744 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6745 };
6746
6747 if scx
6749 .resolve_schema_in_database(&db_spec, &to_schema_name)
6750 .is_ok()
6751 {
6752 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6753 to_schema_name.clone().into_string(),
6754 )));
6755 }
6756
6757 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6759 if schema.id().is_system() {
6760 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6761 }
6762
6763 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6764 cur_schema_spec: (db_spec, schema_spec),
6765 new_schema_name: to_schema_name.into_string(),
6766 }))
6767}
6768
6769pub fn plan_alter_schema_swap<F>(
6770 scx: &mut StatementContext,
6771 name_a: UnresolvedSchemaName,
6772 name_b: Ident,
6773 if_exists: bool,
6774 gen_temp_suffix: F,
6775) -> Result<Plan, PlanError>
6776where
6777 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6778{
6779 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6783 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6784 {
6785 sql_bail!("cannot swap schemas that are in the ambient database");
6786 }
6787 let name_b_str = normalize::ident_ref(&name_b);
6789 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6790 sql_bail!("cannot swap schemas that are in the ambient database");
6791 }
6792
6793 let schema_a = match scx.resolve_schema(name_a.clone()) {
6794 Ok(schema) => schema,
6795 Err(_) if if_exists => {
6796 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6797 name: name_a.to_ast_string_simple(),
6798 object_type: ObjectType::Schema,
6799 });
6800 return Ok(Plan::AlterNoop(AlterNoopPlan {
6801 object_type: ObjectType::Schema,
6802 }));
6803 }
6804 Err(e) => return Err(e),
6805 };
6806
6807 let db_spec = schema_a.database().clone();
6808 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6809 sql_bail!("cannot swap schemas that are in the ambient database");
6810 };
6811 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6812
6813 if schema_a.id().is_system() || schema_b.id().is_system() {
6815 bail_never_supported!("swapping a system schema".to_string())
6816 }
6817
6818 const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6822 let check = |temp_suffix: &str| {
6823 let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6824 temp_name.append_lossy(temp_suffix);
6825 scx.resolve_schema_in_database(&db_spec, &temp_name)
6826 .is_err()
6827 };
6828 let temp_suffix = gen_temp_suffix(&check)?;
6829 let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6830
6831 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6832 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6833 schema_a_name: schema_a.name().schema.to_string(),
6834 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6835 schema_b_name: schema_b.name().schema.to_string(),
6836 name_temp,
6837 }))
6838}
6839
6840pub fn plan_alter_item_rename(
6841 scx: &mut StatementContext,
6842 object_type: ObjectType,
6843 name: UnresolvedItemName,
6844 to_item_name: Ident,
6845 if_exists: bool,
6846) -> Result<Plan, PlanError> {
6847 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6848 Ok(r) => r,
6849 Err(PlanError::MismatchedObjectType {
6851 name,
6852 is_type: ObjectType::MaterializedView,
6853 expected_type: ObjectType::View,
6854 }) => {
6855 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6856 }
6857 e => e?,
6858 };
6859
6860 match resolved {
6861 Some(entry) => {
6862 let full_name = scx.catalog.resolve_full_name(entry.name());
6863 let item_type = entry.item_type();
6864
6865 let proposed_name = QualifiedItemName {
6866 qualifiers: entry.name().qualifiers.clone(),
6867 item: to_item_name.clone().into_string(),
6868 };
6869
6870 let conflicting_type_exists;
6874 let conflicting_item_exists;
6875 if item_type == CatalogItemType::Type {
6876 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6877 conflicting_item_exists = scx
6878 .catalog
6879 .get_item_by_name(&proposed_name)
6880 .map(|item| item.item_type().conflicts_with_type())
6881 .unwrap_or(false);
6882 } else {
6883 conflicting_type_exists = item_type.conflicts_with_type()
6884 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6885 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6886 };
6887 if conflicting_type_exists || conflicting_item_exists {
6888 sql_bail!("catalog item '{}' already exists", to_item_name);
6889 }
6890
6891 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6892 id: entry.id(),
6893 current_full_name: full_name,
6894 to_name: normalize::ident(to_item_name),
6895 object_type,
6896 }))
6897 }
6898 None => {
6899 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6900 name: name.to_ast_string_simple(),
6901 object_type,
6902 });
6903
6904 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6905 }
6906 }
6907}
6908
6909pub fn plan_alter_cluster_rename(
6910 scx: &mut StatementContext,
6911 object_type: ObjectType,
6912 name: Ident,
6913 to_name: Ident,
6914 if_exists: bool,
6915) -> Result<Plan, PlanError> {
6916 match resolve_cluster(scx, &name, if_exists)? {
6917 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6918 id: entry.id(),
6919 name: entry.name().to_string(),
6920 to_name: ident(to_name),
6921 })),
6922 None => {
6923 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6924 name: name.to_ast_string_simple(),
6925 object_type,
6926 });
6927
6928 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6929 }
6930 }
6931}
6932
6933pub fn plan_alter_cluster_swap<F>(
6934 scx: &mut StatementContext,
6935 name_a: Ident,
6936 name_b: Ident,
6937 if_exists: bool,
6938 gen_temp_suffix: F,
6939) -> Result<Plan, PlanError>
6940where
6941 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6942{
6943 let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6944 Ok(cluster) => cluster,
6945 Err(_) if if_exists => {
6946 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6947 name: name_a.to_ast_string_simple(),
6948 object_type: ObjectType::Cluster,
6949 });
6950 return Ok(Plan::AlterNoop(AlterNoopPlan {
6951 object_type: ObjectType::Cluster,
6952 }));
6953 }
6954 Err(e) => return Err(e),
6955 };
6956 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6957
6958 const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6959 let check = |temp_suffix: &str| {
6960 let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6961 temp_name.append_lossy(temp_suffix);
6962 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6963 Err(CatalogError::UnknownCluster(_)) => true,
6965 Ok(_) | Err(_) => false,
6967 }
6968 };
6969 let temp_suffix = gen_temp_suffix(&check)?;
6970 let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6971
6972 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6973 id_a: cluster_a.id(),
6974 id_b: cluster_b.id(),
6975 name_a: name_a.into_string(),
6976 name_b: name_b.into_string(),
6977 name_temp,
6978 }))
6979}
6980
6981pub fn plan_alter_cluster_replica_rename(
6982 scx: &mut StatementContext,
6983 object_type: ObjectType,
6984 name: QualifiedReplica,
6985 to_item_name: Ident,
6986 if_exists: bool,
6987) -> Result<Plan, PlanError> {
6988 match resolve_cluster_replica(scx, &name, if_exists)? {
6989 Some((cluster, replica)) => {
6990 ensure_cluster_is_not_managed(scx, cluster.id())?;
6991 Ok(Plan::AlterClusterReplicaRename(
6992 AlterClusterReplicaRenamePlan {
6993 cluster_id: cluster.id(),
6994 replica_id: replica,
6995 name: QualifiedReplica {
6996 cluster: Ident::new(cluster.name())?,
6997 replica: name.replica,
6998 },
6999 to_name: normalize::ident(to_item_name),
7000 },
7001 ))
7002 }
7003 None => {
7004 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7005 name: name.to_ast_string_simple(),
7006 object_type,
7007 });
7008
7009 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7010 }
7011 }
7012}
7013
7014pub fn describe_alter_object_swap(
7015 _: &StatementContext,
7016 _: AlterObjectSwapStatement,
7017) -> Result<StatementDesc, PlanError> {
7018 Ok(StatementDesc::new(None))
7019}
7020
7021pub fn plan_alter_object_swap(
7022 scx: &mut StatementContext,
7023 stmt: AlterObjectSwapStatement,
7024) -> Result<Plan, PlanError> {
7025 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
7026
7027 let AlterObjectSwapStatement {
7028 object_type,
7029 if_exists,
7030 name_a,
7031 name_b,
7032 } = stmt;
7033 let object_type = object_type.into();
7034
7035 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
7037 let mut attempts = 0;
7038 let name_temp = loop {
7039 attempts += 1;
7040 if attempts > 10 {
7041 tracing::warn!("Unable to generate temp id for swapping");
7042 sql_bail!("unable to swap!");
7043 }
7044
7045 let short_id = mz_ore::id_gen::temp_id();
7047 if check_fn(&short_id) {
7048 break short_id;
7049 }
7050 };
7051
7052 Ok(name_temp)
7053 };
7054
7055 match (object_type, name_a, name_b) {
7056 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
7057 plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
7058 }
7059 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
7060 plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
7061 }
7062 (ObjectType::Schema | ObjectType::Cluster, _, _) => {
7063 unreachable!("parser ensures name type matches object type")
7064 }
7065 (
7066 ObjectType::Table
7067 | ObjectType::View
7068 | ObjectType::MaterializedView
7069 | ObjectType::Source
7070 | ObjectType::Sink
7071 | ObjectType::Index
7072 | ObjectType::Type
7073 | ObjectType::Role
7074 | ObjectType::ClusterReplica
7075 | ObjectType::Secret
7076 | ObjectType::Connection
7077 | ObjectType::Database
7078 | ObjectType::Func
7079 | ObjectType::ContinualTask
7080 | ObjectType::NetworkPolicy,
7081 _,
7082 _,
7083 ) => Err(PlanError::Unsupported {
7084 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
7085 discussion_no: None,
7086 }),
7087 }
7088}
7089
7090pub fn describe_alter_retain_history(
7091 _: &StatementContext,
7092 _: AlterRetainHistoryStatement<Aug>,
7093) -> Result<StatementDesc, PlanError> {
7094 Ok(StatementDesc::new(None))
7095}
7096
7097pub fn plan_alter_retain_history(
7098 scx: &StatementContext,
7099 AlterRetainHistoryStatement {
7100 object_type,
7101 if_exists,
7102 name,
7103 history,
7104 }: AlterRetainHistoryStatement<Aug>,
7105) -> Result<Plan, PlanError> {
7106 alter_retain_history(scx, object_type.into(), if_exists, name, history)
7107}
7108
7109fn alter_retain_history(
7110 scx: &StatementContext,
7111 object_type: ObjectType,
7112 if_exists: bool,
7113 name: UnresolvedObjectName,
7114 history: Option<WithOptionValue<Aug>>,
7115) -> Result<Plan, PlanError> {
7116 let name = match (object_type, name) {
7117 (
7118 ObjectType::View
7120 | ObjectType::MaterializedView
7121 | ObjectType::Table
7122 | ObjectType::Source
7123 | ObjectType::Index,
7124 UnresolvedObjectName::Item(name),
7125 ) => name,
7126 (object_type, _) => {
7127 bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
7128 }
7129 };
7130 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7131 Some(entry) => {
7132 let full_name = scx.catalog.resolve_full_name(entry.name());
7133 let item_type = entry.item_type();
7134
7135 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
7137 return Err(PlanError::AlterViewOnMaterializedView(
7138 full_name.to_string(),
7139 ));
7140 } else if object_type == ObjectType::View {
7141 sql_bail!("{object_type} does not support RETAIN HISTORY")
7142 } else if object_type != item_type {
7143 sql_bail!(
7144 "\"{}\" is a {} not a {}",
7145 full_name,
7146 entry.item_type(),
7147 format!("{object_type}").to_lowercase()
7148 )
7149 }
7150
7151 let (value, lcw) = match &history {
7153 Some(WithOptionValue::RetainHistoryFor(value)) => {
7154 let window = OptionalDuration::try_from_value(value.clone())?;
7155 (Some(value.clone()), window.0)
7156 }
7157 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
7159 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
7160 };
7161 let window = plan_retain_history(scx, lcw)?;
7162
7163 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
7164 id: entry.id(),
7165 value,
7166 window,
7167 object_type,
7168 }))
7169 }
7170 None => {
7171 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7172 name: name.to_ast_string_simple(),
7173 object_type,
7174 });
7175
7176 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7177 }
7178 }
7179}
7180
7181fn alter_source_timestamp_interval(
7182 scx: &StatementContext,
7183 if_exists: bool,
7184 source_name: UnresolvedItemName,
7185 value: Option<WithOptionValue<Aug>>,
7186) -> Result<Plan, PlanError> {
7187 let object_type = ObjectType::Source;
7188 match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
7189 Some(entry) => {
7190 let full_name = scx.catalog.resolve_full_name(entry.name());
7191 if entry.item_type() != CatalogItemType::Source {
7192 sql_bail!(
7193 "\"{}\" is a {} not a {}",
7194 full_name,
7195 entry.item_type(),
7196 format!("{object_type}").to_lowercase()
7197 )
7198 }
7199
7200 match value {
7201 Some(val) => {
7202 let val = match val {
7203 WithOptionValue::Value(v) => v,
7204 _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
7205 };
7206 let duration = Duration::try_from_value(val.clone())?;
7207
7208 let min = scx.catalog.system_vars().min_timestamp_interval();
7209 let max = scx.catalog.system_vars().max_timestamp_interval();
7210 if duration < min || duration > max {
7211 return Err(PlanError::InvalidTimestampInterval {
7212 min,
7213 max,
7214 requested: duration,
7215 });
7216 }
7217
7218 Ok(Plan::AlterSourceTimestampInterval(
7219 AlterSourceTimestampIntervalPlan {
7220 id: entry.id(),
7221 value: Some(val),
7222 interval: duration,
7223 },
7224 ))
7225 }
7226 None => {
7227 let interval = scx.catalog.system_vars().default_timestamp_interval();
7228 Ok(Plan::AlterSourceTimestampInterval(
7229 AlterSourceTimestampIntervalPlan {
7230 id: entry.id(),
7231 value: None,
7232 interval,
7233 },
7234 ))
7235 }
7236 }
7237 }
7238 None => {
7239 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7240 name: source_name.to_ast_string_simple(),
7241 object_type,
7242 });
7243
7244 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7245 }
7246 }
7247}
7248
7249pub fn describe_alter_secret_options(
7250 _: &StatementContext,
7251 _: AlterSecretStatement<Aug>,
7252) -> Result<StatementDesc, PlanError> {
7253 Ok(StatementDesc::new(None))
7254}
7255
7256pub fn plan_alter_secret(
7257 scx: &mut StatementContext,
7258 stmt: AlterSecretStatement<Aug>,
7259) -> Result<Plan, PlanError> {
7260 let AlterSecretStatement {
7261 name,
7262 if_exists,
7263 value,
7264 } = stmt;
7265 let object_type = ObjectType::Secret;
7266 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7267 Some(entry) => entry.id(),
7268 None => {
7269 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7270 name: name.to_string(),
7271 object_type,
7272 });
7273
7274 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7275 }
7276 };
7277
7278 let secret_as = query::plan_secret_as(scx, value)?;
7279
7280 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7281}
7282
7283pub fn describe_alter_connection(
7284 _: &StatementContext,
7285 _: AlterConnectionStatement<Aug>,
7286) -> Result<StatementDesc, PlanError> {
7287 Ok(StatementDesc::new(None))
7288}
7289
7290generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7291
7292pub fn plan_alter_connection(
7293 scx: &StatementContext,
7294 stmt: AlterConnectionStatement<Aug>,
7295) -> Result<Plan, PlanError> {
7296 let AlterConnectionStatement {
7297 name,
7298 if_exists,
7299 actions,
7300 with_options,
7301 } = stmt;
7302 let conn_name = normalize::unresolved_item_name(name)?;
7303 let entry = match scx.catalog.resolve_item(&conn_name) {
7304 Ok(entry) => entry,
7305 Err(_) if if_exists => {
7306 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7307 name: conn_name.to_string(),
7308 object_type: ObjectType::Sink,
7309 });
7310
7311 return Ok(Plan::AlterNoop(AlterNoopPlan {
7312 object_type: ObjectType::Connection,
7313 }));
7314 }
7315 Err(e) => return Err(e.into()),
7316 };
7317
7318 let connection = entry.connection()?;
7319
7320 if actions
7321 .iter()
7322 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7323 {
7324 if actions.len() > 1 {
7325 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7326 }
7327
7328 if !with_options.is_empty() {
7329 sql_bail!(
7330 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7331 with_options
7332 .iter()
7333 .map(|o| o.to_ast_string_simple())
7334 .join(", ")
7335 );
7336 }
7337
7338 if !matches!(connection, Connection::Ssh(_)) {
7339 sql_bail!(
7340 "{} is not an SSH connection",
7341 scx.catalog.resolve_full_name(entry.name())
7342 )
7343 }
7344
7345 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7346 id: entry.id(),
7347 action: crate::plan::AlterConnectionAction::RotateKeys,
7348 }));
7349 }
7350
7351 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7352 if options.validate.is_some() {
7353 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7354 }
7355
7356 let validate = match options.validate {
7357 Some(val) => val,
7358 None => {
7359 scx.catalog
7360 .system_vars()
7361 .enable_default_connection_validation()
7362 && connection.validate_by_default()
7363 }
7364 };
7365
7366 let connection_type = match connection {
7367 Connection::Aws(_) => CreateConnectionType::Aws,
7368 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7369 Connection::Kafka(_) => CreateConnectionType::Kafka,
7370 Connection::Csr(_) => CreateConnectionType::Csr,
7371 Connection::Postgres(_) => CreateConnectionType::Postgres,
7372 Connection::Ssh(_) => CreateConnectionType::Ssh,
7373 Connection::MySql(_) => CreateConnectionType::MySql,
7374 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7375 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7376 };
7377
7378 let specified_options: BTreeSet<_> = actions
7380 .iter()
7381 .map(|action: &AlterConnectionAction<Aug>| match action {
7382 AlterConnectionAction::SetOption(option) => option.name.clone(),
7383 AlterConnectionAction::DropOption(name) => name.clone(),
7384 AlterConnectionAction::RotateKeys => unreachable!(),
7385 })
7386 .collect();
7387
7388 for invalid in INALTERABLE_OPTIONS {
7389 if specified_options.contains(invalid) {
7390 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7391 }
7392 }
7393
7394 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7395
7396 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7398 actions.into_iter().partition_map(|action| match action {
7399 AlterConnectionAction::SetOption(option) => Either::Left(option),
7400 AlterConnectionAction::DropOption(name) => Either::Right(name),
7401 AlterConnectionAction::RotateKeys => unreachable!(),
7402 });
7403
7404 let set_options: BTreeMap<_, _> = set_options_vec
7405 .clone()
7406 .into_iter()
7407 .map(|option| (option.name, option.value))
7408 .collect();
7409
7410 let connection_options_extracted =
7414 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7415
7416 let duplicates: Vec<_> = connection_options_extracted
7417 .seen
7418 .intersection(&drop_options)
7419 .collect();
7420
7421 if !duplicates.is_empty() {
7422 sql_bail!(
7423 "cannot both SET and DROP/RESET options {}",
7424 duplicates
7425 .iter()
7426 .map(|option| option.to_string())
7427 .join(", ")
7428 )
7429 }
7430
7431 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7432 let set_options_count = mutually_exclusive_options
7433 .iter()
7434 .filter(|o| set_options.contains_key(o))
7435 .count();
7436 let drop_options_count = mutually_exclusive_options
7437 .iter()
7438 .filter(|o| drop_options.contains(o))
7439 .count();
7440
7441 if set_options_count > 0 && drop_options_count > 0 {
7443 sql_bail!(
7444 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7445 connection_type,
7446 mutually_exclusive_options
7447 .iter()
7448 .map(|option| option.to_string())
7449 .join(", ")
7450 )
7451 }
7452
7453 if set_options_count > 0 || drop_options_count > 0 {
7458 drop_options.extend(mutually_exclusive_options.iter().cloned());
7459 }
7460
7461 }
7464
7465 Ok(Plan::AlterConnection(AlterConnectionPlan {
7466 id: entry.id(),
7467 action: crate::plan::AlterConnectionAction::AlterOptions {
7468 set_options,
7469 drop_options,
7470 validate,
7471 },
7472 }))
7473}
7474
7475pub fn describe_alter_sink(
7476 _: &StatementContext,
7477 _: AlterSinkStatement<Aug>,
7478) -> Result<StatementDesc, PlanError> {
7479 Ok(StatementDesc::new(None))
7480}
7481
7482pub fn plan_alter_sink(
7483 scx: &mut StatementContext,
7484 stmt: AlterSinkStatement<Aug>,
7485) -> Result<Plan, PlanError> {
7486 let AlterSinkStatement {
7487 sink_name,
7488 if_exists,
7489 action,
7490 } = stmt;
7491
7492 let object_type = ObjectType::Sink;
7493 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7494
7495 let Some(item) = item else {
7496 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7497 name: sink_name.to_string(),
7498 object_type,
7499 });
7500
7501 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7502 };
7503 let item = item.at_version(RelationVersionSelector::Latest);
7505
7506 match action {
7507 AlterSinkAction::ChangeRelation(new_from) => {
7508 let create_sql = item.create_sql();
7510 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7511 let [stmt]: [StatementParseResult; 1] = stmts
7512 .try_into()
7513 .expect("create sql of sink was not exactly one statement");
7514 let Statement::CreateSink(stmt) = stmt.ast else {
7515 unreachable!("invalid create SQL for sink item");
7516 };
7517
7518 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7520 stmt.from = new_from;
7521
7522 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7524 unreachable!("invalid plan for CREATE SINK statement");
7525 };
7526
7527 plan.sink.version += 1;
7528
7529 Ok(Plan::AlterSink(AlterSinkPlan {
7530 item_id: item.id(),
7531 global_id: item.global_id(),
7532 sink: plan.sink,
7533 with_snapshot: plan.with_snapshot,
7534 in_cluster: plan.in_cluster,
7535 }))
7536 }
7537 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7538 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7539 }
7540}
7541
7542pub fn describe_alter_source(
7543 _: &StatementContext,
7544 _: AlterSourceStatement<Aug>,
7545) -> Result<StatementDesc, PlanError> {
7546 Ok(StatementDesc::new(None))
7548}
7549
7550generate_extracted_config!(
7551 AlterSourceAddSubsourceOption,
7552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7554 (Details, String)
7555);
7556
7557pub fn plan_alter_source(
7558 scx: &mut StatementContext,
7559 stmt: AlterSourceStatement<Aug>,
7560) -> Result<Plan, PlanError> {
7561 let AlterSourceStatement {
7562 source_name,
7563 if_exists,
7564 action,
7565 } = stmt;
7566 let object_type = ObjectType::Source;
7567
7568 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7569 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7570 name: source_name.to_string(),
7571 object_type,
7572 });
7573
7574 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7575 }
7576
7577 match action {
7578 AlterSourceAction::SetOptions(options) => {
7579 let mut options = options.into_iter();
7580 let option = options.next().unwrap();
7581 if option.name == CreateSourceOptionName::RetainHistory {
7582 if options.next().is_some() {
7583 sql_bail!("RETAIN HISTORY must be only option");
7584 }
7585 return alter_retain_history(
7586 scx,
7587 object_type,
7588 if_exists,
7589 UnresolvedObjectName::Item(source_name),
7590 option.value,
7591 );
7592 }
7593 if option.name == CreateSourceOptionName::TimestampInterval {
7594 if options.next().is_some() {
7595 sql_bail!("TIMESTAMP INTERVAL must be only option");
7596 }
7597 return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7598 }
7599 sql_bail!(
7602 "Cannot modify the {} of a SOURCE.",
7603 option.name.to_ast_string_simple()
7604 );
7605 }
7606 AlterSourceAction::ResetOptions(reset) => {
7607 let mut options = reset.into_iter();
7608 let option = options.next().unwrap();
7609 if option == CreateSourceOptionName::RetainHistory {
7610 if options.next().is_some() {
7611 sql_bail!("RETAIN HISTORY must be only option");
7612 }
7613 return alter_retain_history(
7614 scx,
7615 object_type,
7616 if_exists,
7617 UnresolvedObjectName::Item(source_name),
7618 None,
7619 );
7620 }
7621 if option == CreateSourceOptionName::TimestampInterval {
7622 if options.next().is_some() {
7623 sql_bail!("TIMESTAMP INTERVAL must be only option");
7624 }
7625 return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7626 }
7627 sql_bail!(
7628 "Cannot modify the {} of a SOURCE.",
7629 option.to_ast_string_simple()
7630 );
7631 }
7632 AlterSourceAction::DropSubsources { .. } => {
7633 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7634 }
7635 AlterSourceAction::AddSubsources { .. } => {
7636 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7637 }
7638 AlterSourceAction::RefreshReferences => {
7639 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7640 }
7641 };
7642}
7643
7644pub fn describe_alter_system_set(
7645 _: &StatementContext,
7646 _: AlterSystemSetStatement,
7647) -> Result<StatementDesc, PlanError> {
7648 Ok(StatementDesc::new(None))
7649}
7650
7651pub fn plan_alter_system_set(
7652 _: &StatementContext,
7653 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7654) -> Result<Plan, PlanError> {
7655 let name = name.to_string();
7656 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7657 name,
7658 value: scl::plan_set_variable_to(to)?,
7659 }))
7660}
7661
7662pub fn describe_alter_system_reset(
7663 _: &StatementContext,
7664 _: AlterSystemResetStatement,
7665) -> Result<StatementDesc, PlanError> {
7666 Ok(StatementDesc::new(None))
7667}
7668
7669pub fn plan_alter_system_reset(
7670 _: &StatementContext,
7671 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7672) -> Result<Plan, PlanError> {
7673 let name = name.to_string();
7674 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7675}
7676
7677pub fn describe_alter_system_reset_all(
7678 _: &StatementContext,
7679 _: AlterSystemResetAllStatement,
7680) -> Result<StatementDesc, PlanError> {
7681 Ok(StatementDesc::new(None))
7682}
7683
7684pub fn plan_alter_system_reset_all(
7685 _: &StatementContext,
7686 _: AlterSystemResetAllStatement,
7687) -> Result<Plan, PlanError> {
7688 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7689}
7690
7691pub fn describe_alter_role(
7692 _: &StatementContext,
7693 _: AlterRoleStatement<Aug>,
7694) -> Result<StatementDesc, PlanError> {
7695 Ok(StatementDesc::new(None))
7696}
7697
7698pub fn plan_alter_role(
7699 scx: &StatementContext,
7700 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7701) -> Result<Plan, PlanError> {
7702 let option = match option {
7703 AlterRoleOption::Attributes(attrs) => {
7704 let attrs = plan_role_attributes(attrs, scx)?;
7705 PlannedAlterRoleOption::Attributes(attrs)
7706 }
7707 AlterRoleOption::Variable(variable) => {
7708 let var = plan_role_variable(variable)?;
7709 PlannedAlterRoleOption::Variable(var)
7710 }
7711 };
7712
7713 Ok(Plan::AlterRole(AlterRolePlan {
7714 id: name.id,
7715 name: name.name,
7716 option,
7717 }))
7718}
7719
7720pub fn describe_alter_table_add_column(
7721 _: &StatementContext,
7722 _: AlterTableAddColumnStatement<Aug>,
7723) -> Result<StatementDesc, PlanError> {
7724 Ok(StatementDesc::new(None))
7725}
7726
7727pub fn plan_alter_table_add_column(
7728 scx: &StatementContext,
7729 stmt: AlterTableAddColumnStatement<Aug>,
7730) -> Result<Plan, PlanError> {
7731 let AlterTableAddColumnStatement {
7732 if_exists,
7733 name,
7734 if_col_not_exist,
7735 column_name,
7736 data_type,
7737 } = stmt;
7738 let object_type = ObjectType::Table;
7739
7740 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7741
7742 let (relation_id, item_name, desc) =
7743 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7744 Some(item) => {
7745 let item_name = scx.catalog.resolve_full_name(item.name());
7747 let item = item.at_version(RelationVersionSelector::Latest);
7748 let desc = item.relation_desc().expect("table has desc").into_owned();
7749 (item.id(), item_name, desc)
7750 }
7751 None => {
7752 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7753 name: name.to_ast_string_simple(),
7754 object_type,
7755 });
7756 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7757 }
7758 };
7759
7760 let column_name = ColumnName::from(column_name.as_str());
7761 if desc.get_by_name(&column_name).is_some() {
7762 if if_col_not_exist {
7763 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7764 column_name: column_name.to_string(),
7765 object_name: item_name.item,
7766 });
7767 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7768 } else {
7769 return Err(PlanError::ColumnAlreadyExists {
7770 column_name,
7771 object_name: item_name.item,
7772 });
7773 }
7774 }
7775
7776 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7777 let column_type = scalar_type.nullable(true);
7779 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7781
7782 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7783 relation_id,
7784 column_name,
7785 column_type,
7786 raw_sql_type,
7787 }))
7788}
7789
7790pub fn describe_alter_materialized_view_apply_replacement(
7791 _: &StatementContext,
7792 _: AlterMaterializedViewApplyReplacementStatement,
7793) -> Result<StatementDesc, PlanError> {
7794 Ok(StatementDesc::new(None))
7795}
7796
7797pub fn plan_alter_materialized_view_apply_replacement(
7798 scx: &StatementContext,
7799 stmt: AlterMaterializedViewApplyReplacementStatement,
7800) -> Result<Plan, PlanError> {
7801 let AlterMaterializedViewApplyReplacementStatement {
7802 if_exists,
7803 name,
7804 replacement_name,
7805 } = stmt;
7806
7807 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7808
7809 let object_type = ObjectType::MaterializedView;
7810 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7811 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7812 name: name.to_ast_string_simple(),
7813 object_type,
7814 });
7815 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7816 };
7817
7818 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7819 .expect("if_exists not set");
7820
7821 if replacement.replacement_target() != Some(mv.id()) {
7822 return Err(PlanError::InvalidReplacement {
7823 item_type: mv.item_type(),
7824 item_name: scx.catalog.minimal_qualification(mv.name()),
7825 replacement_type: replacement.item_type(),
7826 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7827 });
7828 }
7829
7830 Ok(Plan::AlterMaterializedViewApplyReplacement(
7831 AlterMaterializedViewApplyReplacementPlan {
7832 id: mv.id(),
7833 replacement_id: replacement.id(),
7834 },
7835 ))
7836}
7837
7838pub fn describe_comment(
7839 _: &StatementContext,
7840 _: CommentStatement<Aug>,
7841) -> Result<StatementDesc, PlanError> {
7842 Ok(StatementDesc::new(None))
7843}
7844
7845pub fn plan_comment(
7846 scx: &mut StatementContext,
7847 stmt: CommentStatement<Aug>,
7848) -> Result<Plan, PlanError> {
7849 const MAX_COMMENT_LENGTH: usize = 1024;
7850
7851 let CommentStatement { object, comment } = stmt;
7852
7853 if let Some(c) = &comment {
7855 if c.len() > 1024 {
7856 return Err(PlanError::CommentTooLong {
7857 length: c.len(),
7858 max_size: MAX_COMMENT_LENGTH,
7859 });
7860 }
7861 }
7862
7863 let (object_id, column_pos) = match &object {
7864 com_ty @ CommentObjectType::Table { name }
7865 | com_ty @ CommentObjectType::View { name }
7866 | com_ty @ CommentObjectType::MaterializedView { name }
7867 | com_ty @ CommentObjectType::Index { name }
7868 | com_ty @ CommentObjectType::Func { name }
7869 | com_ty @ CommentObjectType::Connection { name }
7870 | com_ty @ CommentObjectType::Source { name }
7871 | com_ty @ CommentObjectType::Sink { name }
7872 | com_ty @ CommentObjectType::Secret { name }
7873 | com_ty @ CommentObjectType::ContinualTask { name } => {
7874 let item = scx.get_item_by_resolved_name(name)?;
7875 match (com_ty, item.item_type()) {
7876 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7877 (CommentObjectId::Table(item.id()), None)
7878 }
7879 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7880 (CommentObjectId::View(item.id()), None)
7881 }
7882 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7883 (CommentObjectId::MaterializedView(item.id()), None)
7884 }
7885 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7886 (CommentObjectId::Index(item.id()), None)
7887 }
7888 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7889 (CommentObjectId::Func(item.id()), None)
7890 }
7891 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7892 (CommentObjectId::Connection(item.id()), None)
7893 }
7894 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7895 (CommentObjectId::Source(item.id()), None)
7896 }
7897 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7898 (CommentObjectId::Sink(item.id()), None)
7899 }
7900 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7901 (CommentObjectId::Secret(item.id()), None)
7902 }
7903 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7904 (CommentObjectId::ContinualTask(item.id()), None)
7905 }
7906 (com_ty, cat_ty) => {
7907 let expected_type = match com_ty {
7908 CommentObjectType::Table { .. } => ObjectType::Table,
7909 CommentObjectType::View { .. } => ObjectType::View,
7910 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7911 CommentObjectType::Index { .. } => ObjectType::Index,
7912 CommentObjectType::Func { .. } => ObjectType::Func,
7913 CommentObjectType::Connection { .. } => ObjectType::Connection,
7914 CommentObjectType::Source { .. } => ObjectType::Source,
7915 CommentObjectType::Sink { .. } => ObjectType::Sink,
7916 CommentObjectType::Secret { .. } => ObjectType::Secret,
7917 _ => unreachable!("these are the only types we match on"),
7918 };
7919
7920 return Err(PlanError::InvalidObjectType {
7921 expected_type: SystemObjectType::Object(expected_type),
7922 actual_type: SystemObjectType::Object(cat_ty.into()),
7923 object_name: item.name().item.clone(),
7924 });
7925 }
7926 }
7927 }
7928 CommentObjectType::Type { ty } => match ty {
7929 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7930 sql_bail!("cannot comment on anonymous list or map type");
7931 }
7932 ResolvedDataType::Named { id, modifiers, .. } => {
7933 if !modifiers.is_empty() {
7934 sql_bail!("cannot comment on type with modifiers");
7935 }
7936 (CommentObjectId::Type(*id), None)
7937 }
7938 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7939 },
7940 CommentObjectType::Column { name } => {
7941 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7942 match item.item_type() {
7943 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7944 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7945 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7946 CatalogItemType::MaterializedView => {
7947 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7948 }
7949 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7950 r => {
7951 return Err(PlanError::Unsupported {
7952 feature: format!("Specifying comments on a column of {r}"),
7953 discussion_no: None,
7954 });
7955 }
7956 }
7957 }
7958 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7959 CommentObjectType::Database { name } => {
7960 (CommentObjectId::Database(*name.database_id()), None)
7961 }
7962 CommentObjectType::Schema { name } => {
7963 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7967 sql_bail!(
7968 "cannot comment on schema {} because it is a temporary schema",
7969 mz_repr::namespaces::MZ_TEMP_SCHEMA
7970 );
7971 }
7972 (
7973 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7974 None,
7975 )
7976 }
7977 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7978 CommentObjectType::ClusterReplica { name } => {
7979 let replica = scx.catalog.resolve_cluster_replica(name)?;
7980 (
7981 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7982 None,
7983 )
7984 }
7985 CommentObjectType::NetworkPolicy { name } => {
7986 (CommentObjectId::NetworkPolicy(name.id), None)
7987 }
7988 };
7989
7990 if let Some(p) = column_pos {
7996 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7997 max_num_columns: MAX_NUM_COLUMNS,
7998 req_num_columns: p,
7999 })?;
8000 }
8001
8002 Ok(Plan::Comment(CommentPlan {
8003 object_id,
8004 sub_component: column_pos,
8005 comment,
8006 }))
8007}
8008
8009pub(crate) fn resolve_cluster<'a>(
8010 scx: &'a StatementContext,
8011 name: &'a Ident,
8012 if_exists: bool,
8013) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
8014 match scx.resolve_cluster(Some(name)) {
8015 Ok(cluster) => Ok(Some(cluster)),
8016 Err(_) if if_exists => Ok(None),
8017 Err(e) => Err(e),
8018 }
8019}
8020
8021pub(crate) fn resolve_cluster_replica<'a>(
8022 scx: &'a StatementContext,
8023 name: &QualifiedReplica,
8024 if_exists: bool,
8025) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
8026 match scx.resolve_cluster(Some(&name.cluster)) {
8027 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
8028 Some(replica_id) => Ok(Some((cluster, *replica_id))),
8029 None if if_exists => Ok(None),
8030 None => Err(sql_err!(
8031 "CLUSTER {} has no CLUSTER REPLICA named {}",
8032 cluster.name(),
8033 name.replica.as_str().quoted(),
8034 )),
8035 },
8036 Err(_) if if_exists => Ok(None),
8037 Err(e) => Err(e),
8038 }
8039}
8040
8041pub(crate) fn resolve_database<'a>(
8042 scx: &'a StatementContext,
8043 name: &'a UnresolvedDatabaseName,
8044 if_exists: bool,
8045) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
8046 match scx.resolve_database(name) {
8047 Ok(database) => Ok(Some(database)),
8048 Err(_) if if_exists => Ok(None),
8049 Err(e) => Err(e),
8050 }
8051}
8052
8053pub(crate) fn resolve_schema<'a>(
8054 scx: &'a StatementContext,
8055 name: UnresolvedSchemaName,
8056 if_exists: bool,
8057) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
8058 match scx.resolve_schema(name) {
8059 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
8060 Err(_) if if_exists => Ok(None),
8061 Err(e) => Err(e),
8062 }
8063}
8064
8065pub(crate) fn resolve_network_policy<'a>(
8066 scx: &'a StatementContext,
8067 name: Ident,
8068 if_exists: bool,
8069) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
8070 match scx.catalog.resolve_network_policy(&name.to_string()) {
8071 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
8072 id: policy.id(),
8073 name: policy.name().to_string(),
8074 })),
8075 Err(_) if if_exists => Ok(None),
8076 Err(e) => Err(e.into()),
8077 }
8078}
8079
8080pub(crate) fn resolve_item_or_type<'a>(
8081 scx: &'a StatementContext,
8082 object_type: ObjectType,
8083 name: UnresolvedItemName,
8084 if_exists: bool,
8085) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
8086 let name = normalize::unresolved_item_name(name)?;
8087 let catalog_item = match object_type {
8088 ObjectType::Type => scx.catalog.resolve_type(&name),
8089 ObjectType::Table
8090 | ObjectType::View
8091 | ObjectType::MaterializedView
8092 | ObjectType::Source
8093 | ObjectType::Sink
8094 | ObjectType::Index
8095 | ObjectType::Role
8096 | ObjectType::Cluster
8097 | ObjectType::ClusterReplica
8098 | ObjectType::Secret
8099 | ObjectType::Connection
8100 | ObjectType::Database
8101 | ObjectType::Schema
8102 | ObjectType::Func
8103 | ObjectType::ContinualTask
8104 | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
8105 };
8106
8107 match catalog_item {
8108 Ok(item) => {
8109 let is_type = ObjectType::from(item.item_type());
8110 if object_type == is_type {
8111 Ok(Some(item))
8112 } else {
8113 Err(PlanError::MismatchedObjectType {
8114 name: scx.catalog.minimal_qualification(item.name()),
8115 is_type,
8116 expected_type: object_type,
8117 })
8118 }
8119 }
8120 Err(_) if if_exists => Ok(None),
8121 Err(e) => Err(e.into()),
8122 }
8123}
8124
8125fn ensure_cluster_is_not_managed(
8127 scx: &StatementContext,
8128 cluster_id: ClusterId,
8129) -> Result<(), PlanError> {
8130 let cluster = scx.catalog.get_cluster(cluster_id);
8131 if cluster.is_managed() {
8132 Err(PlanError::ManagedCluster {
8133 cluster_name: cluster.name().to_string(),
8134 })
8135 } else {
8136 Ok(())
8137 }
8138}