1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
155 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
156 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
157 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
158 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
159 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
160 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
161 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
162 MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
163 PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
164 Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
165 WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
166 transform_ast,
167};
168use crate::session::vars::{
169 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
170 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
171 ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
172};
173use crate::{names, parse};
174
175mod connection;
176
177const MAX_NUM_COLUMNS: usize = 256;
182
183static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
184 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
185
186fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
190 if partition_by.len() > desc.len() {
191 tracing::error!(
192 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
193 desc.len(),
194 partition_by.len()
195 );
196 partition_by.truncate(desc.len());
197 }
198
199 let desc_prefix = desc.iter().take(partition_by.len());
200 for (idx, ((desc_name, desc_type), partition_name)) in
201 desc_prefix.zip_eq(partition_by).enumerate()
202 {
203 let partition_name = normalize::column_name(partition_name);
204 if *desc_name != partition_name {
205 sql_bail!(
206 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
207 );
208 }
209 if !preserves_order(&desc_type.scalar_type) {
210 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
211 }
212 }
213 Ok(())
214}
215
216pub fn describe_create_database(
217 _: &StatementContext,
218 _: CreateDatabaseStatement,
219) -> Result<StatementDesc, PlanError> {
220 Ok(StatementDesc::new(None))
221}
222
223pub fn plan_create_database(
224 _: &StatementContext,
225 CreateDatabaseStatement {
226 name,
227 if_not_exists,
228 }: CreateDatabaseStatement,
229) -> Result<Plan, PlanError> {
230 Ok(Plan::CreateDatabase(CreateDatabasePlan {
231 name: normalize::ident(name.0),
232 if_not_exists,
233 }))
234}
235
236pub fn describe_create_schema(
237 _: &StatementContext,
238 _: CreateSchemaStatement,
239) -> Result<StatementDesc, PlanError> {
240 Ok(StatementDesc::new(None))
241}
242
243pub fn plan_create_schema(
244 scx: &StatementContext,
245 CreateSchemaStatement {
246 mut name,
247 if_not_exists,
248 }: CreateSchemaStatement,
249) -> Result<Plan, PlanError> {
250 if name.0.len() > 2 {
251 sql_bail!("schema name {} has more than two components", name);
252 }
253 let schema_name = normalize::ident(
254 name.0
255 .pop()
256 .expect("names always have at least one component"),
257 );
258 let database_spec = match name.0.pop() {
259 None => match scx.catalog.active_database() {
260 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
261 None => sql_bail!("no database specified and no active database"),
262 },
263 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
264 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
265 Err(_) => sql_bail!("invalid database {}", n.as_str()),
266 },
267 };
268 Ok(Plan::CreateSchema(CreateSchemaPlan {
269 database_spec,
270 schema_name,
271 if_not_exists,
272 }))
273}
274
275pub fn describe_create_table(
276 _: &StatementContext,
277 _: CreateTableStatement<Aug>,
278) -> Result<StatementDesc, PlanError> {
279 Ok(StatementDesc::new(None))
280}
281
282pub fn plan_create_table(
283 scx: &StatementContext,
284 stmt: CreateTableStatement<Aug>,
285) -> Result<Plan, PlanError> {
286 let CreateTableStatement {
287 name,
288 columns,
289 constraints,
290 if_not_exists,
291 temporary,
292 with_options,
293 } = &stmt;
294
295 let names: Vec<_> = columns
296 .iter()
297 .filter(|c| {
298 let is_versioned = c
302 .options
303 .iter()
304 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
305 !is_versioned
306 })
307 .map(|c| normalize::column_name(c.name.clone()))
308 .collect();
309
310 if let Some(dup) = names.iter().duplicates().next() {
311 sql_bail!("column {} specified more than once", dup.quoted());
312 }
313
314 let mut column_types = Vec::with_capacity(columns.len());
317 let mut defaults = Vec::with_capacity(columns.len());
318 let mut changes = BTreeMap::new();
319 let mut keys = Vec::new();
320
321 for (i, c) in columns.into_iter().enumerate() {
322 let aug_data_type = &c.data_type;
323 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
324 let mut nullable = true;
325 let mut default = Expr::null();
326 let mut versioned = false;
327 for option in &c.options {
328 match &option.option {
329 ColumnOption::NotNull => nullable = false,
330 ColumnOption::Default(expr) => {
331 let mut expr = expr.clone();
334 transform_ast::transform(scx, &mut expr)?;
335 let _ = query::plan_default_expr(scx, &expr, &ty)?;
336 default = expr.clone();
337 }
338 ColumnOption::Unique { is_primary } => {
339 keys.push(vec![i]);
340 if *is_primary {
341 nullable = false;
342 }
343 }
344 ColumnOption::Versioned { action, version } => {
345 let version = RelationVersion::from(*version);
346 versioned = true;
347
348 let name = normalize::column_name(c.name.clone());
349 let typ = ty.clone().nullable(nullable);
350
351 changes.insert(version, (action.clone(), name, typ));
352 }
353 other => {
354 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
355 }
356 }
357 }
358 if !versioned {
361 column_types.push(ty.nullable(nullable));
362 }
363 defaults.push(default);
364 }
365
366 let mut seen_primary = false;
367 'c: for constraint in constraints {
368 match constraint {
369 TableConstraint::Unique {
370 name: _,
371 columns,
372 is_primary,
373 nulls_not_distinct,
374 } => {
375 if seen_primary && *is_primary {
376 sql_bail!(
377 "multiple primary keys for table {} are not allowed",
378 name.to_ast_string_stable()
379 );
380 }
381 seen_primary = *is_primary || seen_primary;
382
383 let mut key = vec![];
384 for column in columns {
385 let column = normalize::column_name(column.clone());
386 match names.iter().position(|name| *name == column) {
387 None => sql_bail!("unknown column in constraint: {}", column),
388 Some(i) => {
389 let nullable = &mut column_types[i].nullable;
390 if *is_primary {
391 if *nulls_not_distinct {
392 sql_bail!(
393 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
394 );
395 }
396
397 *nullable = false;
398 } else if !(*nulls_not_distinct || !*nullable) {
399 break 'c;
402 }
403
404 key.push(i);
405 }
406 }
407 }
408
409 if *is_primary {
410 keys.insert(0, key);
411 } else {
412 keys.push(key);
413 }
414 }
415 TableConstraint::ForeignKey { .. } => {
416 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
419 }
420 TableConstraint::Check { .. } => {
421 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
424 }
425 }
426 }
427
428 if !keys.is_empty() {
429 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
432 }
433
434 let typ = SqlRelationType::new(column_types).with_keys(keys);
435
436 let temporary = *temporary;
437 let name = if temporary {
438 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 } else {
440 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
441 };
442
443 let full_name = scx.catalog.resolve_full_name(&name);
445 let partial_name = PartialItemName::from(full_name.clone());
446 if let (false, Ok(item)) = (
449 if_not_exists,
450 scx.catalog.resolve_item_or_type(&partial_name),
451 ) {
452 return Err(PlanError::ItemAlreadyExists {
453 name: full_name.to_string(),
454 item_type: item.item_type(),
455 });
456 }
457
458 let desc = RelationDesc::new(typ, names);
459 let mut desc = VersionedRelationDesc::new(desc);
460 for (version, (_action, name, typ)) in changes.into_iter() {
461 let new_version = desc.add_column(name, typ);
462 if version != new_version {
463 return Err(PlanError::InvalidTable {
464 name: full_name.item,
465 });
466 }
467 }
468
469 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
470
471 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
477 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
478
479 let compaction_window = options.iter().find_map(|o| {
480 #[allow(irrefutable_let_patterns)]
481 if let crate::plan::TableOption::RetainHistory(lcw) = o {
482 Some(lcw.clone())
483 } else {
484 None
485 }
486 });
487
488 let table = Table {
489 create_sql,
490 desc,
491 temporary,
492 compaction_window,
493 data_source: TableDataSource::TableWrites { defaults },
494 };
495 Ok(Plan::CreateTable(CreateTablePlan {
496 name,
497 table,
498 if_not_exists: *if_not_exists,
499 }))
500}
501
502pub fn describe_create_table_from_source(
503 _: &StatementContext,
504 _: CreateTableFromSourceStatement<Aug>,
505) -> Result<StatementDesc, PlanError> {
506 Ok(StatementDesc::new(None))
507}
508
509pub fn describe_create_webhook_source(
510 _: &StatementContext,
511 _: CreateWebhookSourceStatement<Aug>,
512) -> Result<StatementDesc, PlanError> {
513 Ok(StatementDesc::new(None))
514}
515
516pub fn describe_create_source(
517 _: &StatementContext,
518 _: CreateSourceStatement<Aug>,
519) -> Result<StatementDesc, PlanError> {
520 Ok(StatementDesc::new(None))
521}
522
523pub fn describe_create_subsource(
524 _: &StatementContext,
525 _: CreateSubsourceStatement<Aug>,
526) -> Result<StatementDesc, PlanError> {
527 Ok(StatementDesc::new(None))
528}
529
530generate_extracted_config!(
531 CreateSourceOption,
532 (TimestampInterval, Duration),
533 (RetainHistory, OptionalDuration)
534);
535
536generate_extracted_config!(
537 PgConfigOption,
538 (Details, String),
539 (Publication, String),
540 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
541 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
542);
543
544generate_extracted_config!(
545 MySqlConfigOption,
546 (Details, String),
547 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
548 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
549);
550
551generate_extracted_config!(
552 SqlServerConfigOption,
553 (Details, String),
554 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
555 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
556);
557
558pub fn plan_create_webhook_source(
559 scx: &StatementContext,
560 mut stmt: CreateWebhookSourceStatement<Aug>,
561) -> Result<Plan, PlanError> {
562 if stmt.is_table {
563 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
564 }
565
566 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
569 let enable_multi_replica_sources =
570 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
571 if !enable_multi_replica_sources {
572 if in_cluster.replica_ids().len() > 1 {
573 sql_bail!("cannot create webhook source in cluster with more than one replica")
574 }
575 }
576 let create_sql =
577 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
578
579 let CreateWebhookSourceStatement {
580 name,
581 if_not_exists,
582 body_format,
583 include_headers,
584 validate_using,
585 is_table,
586 in_cluster: _,
588 } = stmt;
589
590 let validate_using = validate_using
591 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
592 .transpose()?;
593 if let Some(WebhookValidation { expression, .. }) = &validate_using {
594 if !expression.contains_column() {
597 return Err(PlanError::WebhookValidationDoesNotUseColumns);
598 }
599 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
603 return Err(PlanError::WebhookValidationNonDeterministic);
604 }
605 }
606
607 let body_format = match body_format {
608 Format::Bytes => WebhookBodyFormat::Bytes,
609 Format::Json { array } => WebhookBodyFormat::Json { array },
610 Format::Text => WebhookBodyFormat::Text,
611 ty => {
613 return Err(PlanError::Unsupported {
614 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
615 discussion_no: None,
616 });
617 }
618 };
619
620 let mut column_ty = vec![
621 SqlColumnType {
623 scalar_type: SqlScalarType::from(body_format),
624 nullable: false,
625 },
626 ];
627 let mut column_names = vec!["body".to_string()];
628
629 let mut headers = WebhookHeaders::default();
630
631 if let Some(filters) = include_headers.column {
633 column_ty.push(SqlColumnType {
634 scalar_type: SqlScalarType::Map {
635 value_type: Box::new(SqlScalarType::String),
636 custom_id: None,
637 },
638 nullable: false,
639 });
640 column_names.push("headers".to_string());
641
642 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
643 filters.into_iter().partition_map(|filter| {
644 if filter.block {
645 itertools::Either::Right(filter.header_name)
646 } else {
647 itertools::Either::Left(filter.header_name)
648 }
649 });
650 headers.header_column = Some(WebhookHeaderFilters { allow, block });
651 }
652
653 for header in include_headers.mappings {
655 let scalar_type = header
656 .use_bytes
657 .then_some(SqlScalarType::Bytes)
658 .unwrap_or(SqlScalarType::String);
659 column_ty.push(SqlColumnType {
660 scalar_type,
661 nullable: true,
662 });
663 column_names.push(header.column_name.into_string());
664
665 let column_idx = column_ty.len() - 1;
666 assert_eq!(
668 column_idx,
669 column_names.len() - 1,
670 "header column names and types don't match"
671 );
672 headers
673 .mapped_headers
674 .insert(column_idx, (header.header_name, header.use_bytes));
675 }
676
677 let mut unique_check = HashSet::with_capacity(column_names.len());
679 for name in &column_names {
680 if !unique_check.insert(name) {
681 return Err(PlanError::AmbiguousColumn(name.clone().into()));
682 }
683 }
684 if column_names.len() > MAX_NUM_COLUMNS {
685 return Err(PlanError::TooManyColumns {
686 max_num_columns: MAX_NUM_COLUMNS,
687 req_num_columns: column_names.len(),
688 });
689 }
690
691 let typ = SqlRelationType::new(column_ty);
692 let desc = RelationDesc::new(typ, column_names);
693
694 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
696 let full_name = scx.catalog.resolve_full_name(&name);
697 let partial_name = PartialItemName::from(full_name.clone());
698 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
699 return Err(PlanError::ItemAlreadyExists {
700 name: full_name.to_string(),
701 item_type: item.item_type(),
702 });
703 }
704
705 let timeline = Timeline::EpochMilliseconds;
708
709 let plan = if is_table {
710 let data_source = DataSourceDesc::Webhook {
711 validate_using,
712 body_format,
713 headers,
714 cluster_id: Some(in_cluster.id()),
715 };
716 let data_source = TableDataSource::DataSource {
717 desc: data_source,
718 timeline,
719 };
720 Plan::CreateTable(CreateTablePlan {
721 name,
722 if_not_exists,
723 table: Table {
724 create_sql,
725 desc: VersionedRelationDesc::new(desc),
726 temporary: false,
727 compaction_window: None,
728 data_source,
729 },
730 })
731 } else {
732 let data_source = DataSourceDesc::Webhook {
733 validate_using,
734 body_format,
735 headers,
736 cluster_id: None,
738 };
739 Plan::CreateSource(CreateSourcePlan {
740 name,
741 source: Source {
742 create_sql,
743 data_source,
744 desc,
745 compaction_window: None,
746 },
747 if_not_exists,
748 timeline,
749 in_cluster: Some(in_cluster.id()),
750 })
751 };
752
753 Ok(plan)
754}
755
756pub fn plan_create_source(
757 scx: &StatementContext,
758 mut stmt: CreateSourceStatement<Aug>,
759) -> Result<Plan, PlanError> {
760 let CreateSourceStatement {
761 name,
762 in_cluster: _,
763 col_names,
764 connection: source_connection,
765 envelope,
766 if_not_exists,
767 format,
768 key_constraint,
769 include_metadata,
770 with_options,
771 external_references: referenced_subsources,
772 progress_subsource,
773 } = &stmt;
774
775 mz_ore::soft_assert_or_log!(
776 referenced_subsources.is_none(),
777 "referenced subsources must be cleared in purification"
778 );
779
780 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
781 && scx.catalog.system_vars().force_source_table_syntax();
782
783 if force_source_table_syntax {
786 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
787 Err(PlanError::UseTablesForSources(
788 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
789 ))?;
790 }
791 }
792
793 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
794
795 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
796 && include_metadata
797 .iter()
798 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
799 {
800 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
802 }
803 if !matches!(
804 source_connection,
805 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
806 ) && !include_metadata.is_empty()
807 {
808 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
809 }
810
811 if !include_metadata.is_empty()
812 && !matches!(
813 envelope,
814 ast::SourceEnvelope::Upsert { .. }
815 | ast::SourceEnvelope::None
816 | ast::SourceEnvelope::Debezium
817 )
818 {
819 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
820 }
821
822 let external_connection =
823 plan_generic_source_connection(scx, source_connection, include_metadata)?;
824
825 let CreateSourceOptionExtracted {
826 timestamp_interval,
827 retain_history,
828 seen: _,
829 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
830
831 let metadata_columns_desc = match external_connection {
832 GenericSourceConnection::Kafka(KafkaSourceConnection {
833 ref metadata_columns,
834 ..
835 }) => kafka_metadata_columns_desc(metadata_columns),
836 _ => vec![],
837 };
838
839 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
841 scx,
842 &envelope,
843 format,
844 Some(external_connection.default_key_desc()),
845 external_connection.default_value_desc(),
846 include_metadata,
847 metadata_columns_desc,
848 &external_connection,
849 )?;
850 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
851
852 let names: Vec<_> = desc.iter_names().cloned().collect();
853 if let Some(dup) = names.iter().duplicates().next() {
854 sql_bail!("column {} specified more than once", dup.quoted());
855 }
856
857 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
859 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
862
863 let key_columns = columns
864 .into_iter()
865 .map(normalize::column_name)
866 .collect::<Vec<_>>();
867
868 let mut uniq = BTreeSet::new();
869 for col in key_columns.iter() {
870 if !uniq.insert(col) {
871 sql_bail!("Repeated column name in source key constraint: {}", col);
872 }
873 }
874
875 let key_indices = key_columns
876 .iter()
877 .map(|col| {
878 let name_idx = desc
879 .get_by_name(col)
880 .map(|(idx, _type)| idx)
881 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
882 if desc.get_unambiguous_name(name_idx).is_none() {
883 sql_bail!("Ambiguous column in source key constraint: {}", col);
884 }
885 Ok(name_idx)
886 })
887 .collect::<Result<Vec<_>, _>>()?;
888
889 if !desc.typ().keys.is_empty() {
890 return Err(key_constraint_err(&desc, &key_columns));
891 } else {
892 desc = desc.with_key(key_indices);
893 }
894 }
895
896 let timestamp_interval = match timestamp_interval {
897 Some(duration) => {
898 if scx.pcx.is_some() {
902 let min = scx.catalog.system_vars().min_timestamp_interval();
903 let max = scx.catalog.system_vars().max_timestamp_interval();
904 if duration < min || duration > max {
905 return Err(PlanError::InvalidTimestampInterval {
906 min,
907 max,
908 requested: duration,
909 });
910 }
911 }
912 duration
913 }
914 None => scx.catalog.system_vars().default_timestamp_interval(),
915 };
916
917 let (desc, data_source) = match progress_subsource {
918 Some(name) => {
919 let DeferredItemName::Named(name) = name else {
920 sql_bail!("[internal error] progress subsource must be named during purification");
921 };
922 let ResolvedItemName::Item { id, .. } = name else {
923 sql_bail!("[internal error] invalid target id");
924 };
925
926 let details = match external_connection {
927 GenericSourceConnection::Kafka(ref c) => {
928 SourceExportDetails::Kafka(KafkaSourceExportDetails {
929 metadata_columns: c.metadata_columns.clone(),
930 })
931 }
932 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
933 LoadGenerator::Auction
934 | LoadGenerator::Marketing
935 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
936 LoadGenerator::Counter { .. }
937 | LoadGenerator::Clock
938 | LoadGenerator::Datums
939 | LoadGenerator::KeyValue(_) => {
940 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
941 output: LoadGeneratorOutput::Default,
942 })
943 }
944 },
945 GenericSourceConnection::Postgres(_)
946 | GenericSourceConnection::MySql(_)
947 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
948 };
949
950 let data_source = DataSourceDesc::OldSyntaxIngestion {
951 desc: SourceDesc {
952 connection: external_connection,
953 timestamp_interval,
954 },
955 progress_subsource: *id,
956 data_config: SourceExportDataConfig {
957 encoding,
958 envelope: envelope.clone(),
959 },
960 details,
961 };
962 (desc, data_source)
963 }
964 None => {
965 let desc = external_connection.timestamp_desc();
966 let data_source = DataSourceDesc::Ingestion(SourceDesc {
967 connection: external_connection,
968 timestamp_interval,
969 });
970 (desc, data_source)
971 }
972 };
973
974 let if_not_exists = *if_not_exists;
975 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
976
977 let full_name = scx.catalog.resolve_full_name(&name);
979 let partial_name = PartialItemName::from(full_name.clone());
980 if let (false, Ok(item)) = (
983 if_not_exists,
984 scx.catalog.resolve_item_or_type(&partial_name),
985 ) {
986 return Err(PlanError::ItemAlreadyExists {
987 name: full_name.to_string(),
988 item_type: item.item_type(),
989 });
990 }
991
992 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
996
997 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
998
999 let timeline = match envelope {
1001 SourceEnvelope::CdcV2 => {
1002 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1003 }
1004 _ => Timeline::EpochMilliseconds,
1005 };
1006
1007 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1008
1009 let source = Source {
1010 create_sql,
1011 data_source,
1012 desc,
1013 compaction_window,
1014 };
1015
1016 Ok(Plan::CreateSource(CreateSourcePlan {
1017 name,
1018 source,
1019 if_not_exists,
1020 timeline,
1021 in_cluster: Some(in_cluster.id()),
1022 }))
1023}
1024
1025pub fn plan_generic_source_connection(
1026 scx: &StatementContext<'_>,
1027 source_connection: &CreateSourceConnection<Aug>,
1028 include_metadata: &Vec<SourceIncludeMetadata>,
1029) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1030 Ok(match source_connection {
1031 CreateSourceConnection::Kafka {
1032 connection,
1033 options,
1034 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1035 scx,
1036 connection,
1037 options,
1038 include_metadata,
1039 )?),
1040 CreateSourceConnection::Postgres {
1041 connection,
1042 options,
1043 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1044 scx, connection, options,
1045 )?),
1046 CreateSourceConnection::SqlServer {
1047 connection,
1048 options,
1049 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1050 scx, connection, options,
1051 )?),
1052 CreateSourceConnection::MySql {
1053 connection,
1054 options,
1055 } => {
1056 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1057 }
1058 CreateSourceConnection::LoadGenerator { generator, options } => {
1059 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1060 scx,
1061 generator,
1062 options,
1063 include_metadata,
1064 )?)
1065 }
1066 })
1067}
1068
1069fn plan_load_generator_source_connection(
1070 scx: &StatementContext<'_>,
1071 generator: &ast::LoadGenerator,
1072 options: &Vec<LoadGeneratorOption<Aug>>,
1073 include_metadata: &Vec<SourceIncludeMetadata>,
1074) -> Result<LoadGeneratorSourceConnection, PlanError> {
1075 let load_generator =
1076 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1077 let LoadGeneratorOptionExtracted {
1078 tick_interval,
1079 as_of,
1080 up_to,
1081 ..
1082 } = options.clone().try_into()?;
1083 let tick_micros = match tick_interval {
1084 Some(interval) => Some(interval.as_micros().try_into()?),
1085 None => None,
1086 };
1087 if up_to < as_of {
1088 sql_bail!("UP TO cannot be less than AS OF");
1089 }
1090 Ok(LoadGeneratorSourceConnection {
1091 load_generator,
1092 tick_micros,
1093 as_of,
1094 up_to,
1095 })
1096}
1097
1098fn plan_mysql_source_connection(
1099 scx: &StatementContext<'_>,
1100 connection: &ResolvedItemName,
1101 options: &Vec<MySqlConfigOption<Aug>>,
1102) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1103 let connection_item = scx.get_item_by_resolved_name(connection)?;
1104 match connection_item.connection()? {
1105 Connection::MySql(connection) => connection,
1106 _ => sql_bail!(
1107 "{} is not a MySQL connection",
1108 scx.catalog.resolve_full_name(connection_item.name())
1109 ),
1110 };
1111 let MySqlConfigOptionExtracted {
1112 details,
1113 text_columns: _,
1117 exclude_columns: _,
1118 seen: _,
1119 } = options.clone().try_into()?;
1120 let details = details
1121 .as_ref()
1122 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1123 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1124 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1125 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1126 Ok(MySqlSourceConnection {
1127 connection: connection_item.id(),
1128 connection_id: connection_item.id(),
1129 details,
1130 })
1131}
1132
1133fn plan_sqlserver_source_connection(
1134 scx: &StatementContext<'_>,
1135 connection: &ResolvedItemName,
1136 options: &Vec<SqlServerConfigOption<Aug>>,
1137) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1138 let connection_item = scx.get_item_by_resolved_name(connection)?;
1139 match connection_item.connection()? {
1140 Connection::SqlServer(connection) => connection,
1141 _ => sql_bail!(
1142 "{} is not a SQL Server connection",
1143 scx.catalog.resolve_full_name(connection_item.name())
1144 ),
1145 };
1146 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1147 let details = details
1148 .as_ref()
1149 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1150 let extras = hex::decode(details)
1151 .map_err(|e| sql_err!("{e}"))
1152 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1153 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1154 Ok(SqlServerSourceConnection {
1155 connection_id: connection_item.id(),
1156 connection: connection_item.id(),
1157 extras,
1158 })
1159}
1160
1161fn plan_postgres_source_connection(
1162 scx: &StatementContext<'_>,
1163 connection: &ResolvedItemName,
1164 options: &Vec<PgConfigOption<Aug>>,
1165) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1166 let connection_item = scx.get_item_by_resolved_name(connection)?;
1167 let PgConfigOptionExtracted {
1168 details,
1169 publication,
1170 text_columns: _,
1174 exclude_columns: _,
1178 seen: _,
1179 } = options.clone().try_into()?;
1180 let details = details
1181 .as_ref()
1182 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1183 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1184 let details =
1185 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1186 let publication_details =
1187 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1188 Ok(PostgresSourceConnection {
1189 connection: connection_item.id(),
1190 connection_id: connection_item.id(),
1191 publication: publication.expect("validated exists during purification"),
1192 publication_details,
1193 })
1194}
1195
1196fn plan_kafka_source_connection(
1197 scx: &StatementContext<'_>,
1198 connection_name: &ResolvedItemName,
1199 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1200 include_metadata: &Vec<SourceIncludeMetadata>,
1201) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1202 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1203 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1204 sql_bail!(
1205 "{} is not a kafka connection",
1206 scx.catalog.resolve_full_name(connection_item.name())
1207 )
1208 }
1209 let KafkaSourceConfigOptionExtracted {
1210 group_id_prefix,
1211 topic,
1212 topic_metadata_refresh_interval,
1213 start_timestamp: _, start_offset,
1215 seen: _,
1216 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1217 let topic = topic.expect("validated exists during purification");
1218 let mut start_offsets = BTreeMap::new();
1219 if let Some(offsets) = start_offset {
1220 for (part, offset) in offsets.iter().enumerate() {
1221 if *offset < 0 {
1222 sql_bail!("START OFFSET must be a nonnegative integer");
1223 }
1224 start_offsets.insert(i32::try_from(part)?, *offset);
1225 }
1226 }
1227 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1228 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1231 }
1232 let metadata_columns = include_metadata
1233 .into_iter()
1234 .flat_map(|item| match item {
1235 SourceIncludeMetadata::Timestamp { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "timestamp".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Timestamp))
1241 }
1242 SourceIncludeMetadata::Partition { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "partition".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Partition))
1248 }
1249 SourceIncludeMetadata::Offset { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "offset".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Offset))
1255 }
1256 SourceIncludeMetadata::Headers { alias } => {
1257 let name = match alias {
1258 Some(name) => name.to_string(),
1259 None => "headers".to_owned(),
1260 };
1261 Some((name, KafkaMetadataKind::Headers))
1262 }
1263 SourceIncludeMetadata::Header {
1264 alias,
1265 key,
1266 use_bytes,
1267 } => Some((
1268 alias.to_string(),
1269 KafkaMetadataKind::Header {
1270 key: key.clone(),
1271 use_bytes: *use_bytes,
1272 },
1273 )),
1274 SourceIncludeMetadata::Key { .. } => {
1275 None
1277 }
1278 })
1279 .collect();
1280 Ok(KafkaSourceConnection {
1281 connection: connection_item.id(),
1282 connection_id: connection_item.id(),
1283 topic,
1284 start_offsets,
1285 group_id_prefix,
1286 topic_metadata_refresh_interval,
1287 metadata_columns,
1288 })
1289}
1290
1291fn apply_source_envelope_encoding(
1292 scx: &StatementContext,
1293 envelope: &ast::SourceEnvelope,
1294 format: &Option<FormatSpecifier<Aug>>,
1295 key_desc: Option<RelationDesc>,
1296 value_desc: RelationDesc,
1297 include_metadata: &[SourceIncludeMetadata],
1298 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1299 source_connection: &GenericSourceConnection<ReferencedConnection>,
1300) -> Result<
1301 (
1302 RelationDesc,
1303 SourceEnvelope,
1304 Option<SourceDataEncoding<ReferencedConnection>>,
1305 ),
1306 PlanError,
1307> {
1308 let encoding = match format {
1309 Some(format) => Some(get_encoding(scx, format, envelope)?),
1310 None => None,
1311 };
1312
1313 let (key_desc, value_desc) = match &encoding {
1314 Some(encoding) => {
1315 match value_desc.typ().columns() {
1318 [typ] => match typ.scalar_type {
1319 SqlScalarType::Bytes => {}
1320 _ => sql_bail!(
1321 "The schema produced by the source is incompatible with format decoding"
1322 ),
1323 },
1324 _ => sql_bail!(
1325 "The schema produced by the source is incompatible with format decoding"
1326 ),
1327 }
1328
1329 let (key_desc, value_desc) = encoding.desc()?;
1330
1331 let key_desc = key_desc.map(|desc| {
1338 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1339 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1340 if is_kafka && is_envelope_none {
1341 RelationDesc::from_names_and_types(
1342 desc.into_iter()
1343 .map(|(name, typ)| (name, typ.nullable(true))),
1344 )
1345 } else {
1346 desc
1347 }
1348 });
1349 (key_desc, value_desc)
1350 }
1351 None => (key_desc, value_desc),
1352 };
1353
1354 let key_envelope_no_encoding = matches!(
1367 source_connection,
1368 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1369 load_generator: LoadGenerator::KeyValue(_),
1370 ..
1371 })
1372 );
1373 let mut key_envelope = get_key_envelope(
1374 include_metadata,
1375 encoding.as_ref(),
1376 key_envelope_no_encoding,
1377 )?;
1378
1379 match (&envelope, &key_envelope) {
1380 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1381 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1382 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1383 ),
1384 _ => {}
1385 };
1386
1387 let envelope = match &envelope {
1396 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1398 ast::SourceEnvelope::Debezium => {
1399 let after_idx = match typecheck_debezium(&value_desc) {
1401 Ok((_before_idx, after_idx)) => Ok(after_idx),
1402 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1403 Some(DataEncoding::Avro(_)) => Err(type_err),
1404 _ => Err(sql_err!(
1405 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1406 )),
1407 },
1408 }?;
1409
1410 UnplannedSourceEnvelope::Upsert {
1411 style: UpsertStyle::Debezium { after_idx },
1412 }
1413 }
1414 ast::SourceEnvelope::Upsert {
1415 value_decode_err_policy,
1416 } => {
1417 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1418 None => {
1419 if !key_envelope_no_encoding {
1420 bail_unsupported!(format!(
1421 "UPSERT requires a key/value format: {:?}",
1422 format
1423 ))
1424 }
1425 None
1426 }
1427 Some(key_encoding) => Some(key_encoding),
1428 };
1429 if key_envelope == KeyEnvelope::None {
1432 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1433 }
1434 let style = match value_decode_err_policy.as_slice() {
1436 [] => UpsertStyle::Default(key_envelope),
1437 [SourceErrorPolicy::Inline { alias }] => {
1438 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1439 UpsertStyle::ValueErrInline {
1440 key_envelope,
1441 error_column: alias
1442 .as_ref()
1443 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1444 }
1445 }
1446 _ => {
1447 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1448 }
1449 };
1450
1451 UnplannedSourceEnvelope::Upsert { style }
1452 }
1453 ast::SourceEnvelope::CdcV2 => {
1454 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1455 match format {
1457 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1458 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1459 }
1460 UnplannedSourceEnvelope::CdcV2
1461 }
1462 };
1463
1464 let metadata_desc = included_column_desc(metadata_columns_desc);
1465 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1466
1467 Ok((desc, envelope, encoding))
1468}
1469
1470fn plan_source_export_desc(
1473 scx: &StatementContext,
1474 name: &UnresolvedItemName,
1475 columns: &Vec<ColumnDef<Aug>>,
1476 constraints: &Vec<TableConstraint<Aug>>,
1477) -> Result<RelationDesc, PlanError> {
1478 let names: Vec<_> = columns
1479 .iter()
1480 .map(|c| normalize::column_name(c.name.clone()))
1481 .collect();
1482
1483 if let Some(dup) = names.iter().duplicates().next() {
1484 sql_bail!("column {} specified more than once", dup.quoted());
1485 }
1486
1487 let mut column_types = Vec::with_capacity(columns.len());
1490 let mut keys = Vec::new();
1491
1492 for (i, c) in columns.into_iter().enumerate() {
1493 let aug_data_type = &c.data_type;
1494 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1495 let mut nullable = true;
1496 for option in &c.options {
1497 match &option.option {
1498 ColumnOption::NotNull => nullable = false,
1499 ColumnOption::Default(_) => {
1500 bail_unsupported!("Source export with default value")
1501 }
1502 ColumnOption::Unique { is_primary } => {
1503 keys.push(vec![i]);
1504 if *is_primary {
1505 nullable = false;
1506 }
1507 }
1508 other => {
1509 bail_unsupported!(format!("Source export with column constraint: {}", other))
1510 }
1511 }
1512 }
1513 column_types.push(ty.nullable(nullable));
1514 }
1515
1516 let mut seen_primary = false;
1517 'c: for constraint in constraints {
1518 match constraint {
1519 TableConstraint::Unique {
1520 name: _,
1521 columns,
1522 is_primary,
1523 nulls_not_distinct,
1524 } => {
1525 if seen_primary && *is_primary {
1526 sql_bail!(
1527 "multiple primary keys for source export {} are not allowed",
1528 name.to_ast_string_stable()
1529 );
1530 }
1531 seen_primary = *is_primary || seen_primary;
1532
1533 let mut key = vec![];
1534 for column in columns {
1535 let column = normalize::column_name(column.clone());
1536 match names.iter().position(|name| *name == column) {
1537 None => sql_bail!("unknown column in constraint: {}", column),
1538 Some(i) => {
1539 let nullable = &mut column_types[i].nullable;
1540 if *is_primary {
1541 if *nulls_not_distinct {
1542 sql_bail!(
1543 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1544 );
1545 }
1546 *nullable = false;
1547 } else if !(*nulls_not_distinct || !*nullable) {
1548 break 'c;
1551 }
1552
1553 key.push(i);
1554 }
1555 }
1556 }
1557
1558 if *is_primary {
1559 keys.insert(0, key);
1560 } else {
1561 keys.push(key);
1562 }
1563 }
1564 TableConstraint::ForeignKey { .. } => {
1565 bail_unsupported!("Source export with a foreign key")
1566 }
1567 TableConstraint::Check { .. } => {
1568 bail_unsupported!("Source export with a check constraint")
1569 }
1570 }
1571 }
1572
1573 let typ = SqlRelationType::new(column_types).with_keys(keys);
1574 let desc = RelationDesc::new(typ, names);
1575 Ok(desc)
1576}
1577
1578generate_extracted_config!(
1579 CreateSubsourceOption,
1580 (Progress, bool, Default(false)),
1581 (ExternalReference, UnresolvedItemName),
1582 (RetainHistory, OptionalDuration),
1583 (TextColumns, Vec::<Ident>, Default(vec![])),
1584 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1585 (Details, String)
1586);
1587
1588pub fn plan_create_subsource(
1589 scx: &StatementContext,
1590 stmt: CreateSubsourceStatement<Aug>,
1591) -> Result<Plan, PlanError> {
1592 let CreateSubsourceStatement {
1593 name,
1594 columns,
1595 of_source,
1596 constraints,
1597 if_not_exists,
1598 with_options,
1599 } = &stmt;
1600
1601 let CreateSubsourceOptionExtracted {
1602 progress,
1603 retain_history,
1604 external_reference,
1605 text_columns,
1606 exclude_columns,
1607 details,
1608 seen: _,
1609 } = with_options.clone().try_into()?;
1610
1611 assert!(
1616 progress ^ (external_reference.is_some() && of_source.is_some()),
1617 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1618 );
1619
1620 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1621
1622 let data_source = if let Some(source_reference) = of_source {
1623 if scx.catalog.system_vars().enable_create_table_from_source()
1626 && scx.catalog.system_vars().force_source_table_syntax()
1627 {
1628 Err(PlanError::UseTablesForSources(
1629 "CREATE SUBSOURCE".to_string(),
1630 ))?;
1631 }
1632
1633 let ingestion_id = *source_reference.item_id();
1636 let external_reference = external_reference.unwrap();
1637
1638 let details = details
1641 .as_ref()
1642 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1643 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1644 let details =
1645 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1646 let details =
1647 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1648 let details = match details {
1649 SourceExportStatementDetails::Postgres { table } => {
1650 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1651 column_casts: crate::pure::postgres::generate_column_casts(
1652 scx,
1653 &table,
1654 &text_columns,
1655 )?,
1656 table,
1657 })
1658 }
1659 SourceExportStatementDetails::MySql {
1660 table,
1661 initial_gtid_set,
1662 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1663 table,
1664 initial_gtid_set,
1665 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1666 exclude_columns: exclude_columns
1667 .into_iter()
1668 .map(|c| c.into_string())
1669 .collect(),
1670 }),
1671 SourceExportStatementDetails::SqlServer {
1672 table,
1673 capture_instance,
1674 initial_lsn,
1675 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1676 capture_instance,
1677 table,
1678 initial_lsn,
1679 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1680 exclude_columns: exclude_columns
1681 .into_iter()
1682 .map(|c| c.into_string())
1683 .collect(),
1684 }),
1685 SourceExportStatementDetails::LoadGenerator { output } => {
1686 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1687 }
1688 SourceExportStatementDetails::Kafka {} => {
1689 bail_unsupported!("subsources cannot reference Kafka sources")
1690 }
1691 };
1692 DataSourceDesc::IngestionExport {
1693 ingestion_id,
1694 external_reference,
1695 details,
1696 data_config: SourceExportDataConfig {
1698 envelope: SourceEnvelope::None(NoneEnvelope {
1699 key_envelope: KeyEnvelope::None,
1700 key_arity: 0,
1701 }),
1702 encoding: None,
1703 },
1704 }
1705 } else if progress {
1706 DataSourceDesc::Progress
1707 } else {
1708 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1709 };
1710
1711 let if_not_exists = *if_not_exists;
1712 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1713
1714 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1715
1716 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1717 let source = Source {
1718 create_sql,
1719 data_source,
1720 desc,
1721 compaction_window,
1722 };
1723
1724 Ok(Plan::CreateSource(CreateSourcePlan {
1725 name,
1726 source,
1727 if_not_exists,
1728 timeline: Timeline::EpochMilliseconds,
1729 in_cluster: None,
1730 }))
1731}
1732
1733generate_extracted_config!(
1734 TableFromSourceOption,
1735 (TextColumns, Vec::<Ident>, Default(vec![])),
1736 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1737 (PartitionBy, Vec<Ident>),
1738 (RetainHistory, OptionalDuration),
1739 (Details, String)
1740);
1741
1742pub fn plan_create_table_from_source(
1743 scx: &StatementContext,
1744 stmt: CreateTableFromSourceStatement<Aug>,
1745) -> Result<Plan, PlanError> {
1746 if !scx.catalog.system_vars().enable_create_table_from_source() {
1747 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1748 }
1749
1750 let CreateTableFromSourceStatement {
1751 name,
1752 columns,
1753 constraints,
1754 if_not_exists,
1755 source,
1756 external_reference,
1757 envelope,
1758 format,
1759 include_metadata,
1760 with_options,
1761 } = &stmt;
1762
1763 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1764
1765 let TableFromSourceOptionExtracted {
1766 text_columns,
1767 exclude_columns,
1768 retain_history,
1769 partition_by,
1770 details,
1771 seen: _,
1772 } = with_options.clone().try_into()?;
1773
1774 let source_item = scx.get_item_by_resolved_name(source)?;
1775 let ingestion_id = source_item.id();
1776
1777 let details = details
1780 .as_ref()
1781 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1782 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1783 let details =
1784 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1785 let details =
1786 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1787
1788 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1789 && include_metadata
1790 .iter()
1791 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1792 {
1793 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1795 }
1796 if !matches!(
1797 details,
1798 SourceExportStatementDetails::Kafka { .. }
1799 | SourceExportStatementDetails::LoadGenerator { .. }
1800 ) && !include_metadata.is_empty()
1801 {
1802 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1803 }
1804
1805 let details = match details {
1806 SourceExportStatementDetails::Postgres { table } => {
1807 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1808 column_casts: crate::pure::postgres::generate_column_casts(
1809 scx,
1810 &table,
1811 &text_columns,
1812 )?,
1813 table,
1814 })
1815 }
1816 SourceExportStatementDetails::MySql {
1817 table,
1818 initial_gtid_set,
1819 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1820 table,
1821 initial_gtid_set,
1822 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1823 exclude_columns: exclude_columns
1824 .into_iter()
1825 .map(|c| c.into_string())
1826 .collect(),
1827 }),
1828 SourceExportStatementDetails::SqlServer {
1829 table,
1830 capture_instance,
1831 initial_lsn,
1832 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1833 table,
1834 capture_instance,
1835 initial_lsn,
1836 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1837 exclude_columns: exclude_columns
1838 .into_iter()
1839 .map(|c| c.into_string())
1840 .collect(),
1841 }),
1842 SourceExportStatementDetails::LoadGenerator { output } => {
1843 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1844 }
1845 SourceExportStatementDetails::Kafka {} => {
1846 if !include_metadata.is_empty()
1847 && !matches!(
1848 envelope,
1849 ast::SourceEnvelope::Upsert { .. }
1850 | ast::SourceEnvelope::None
1851 | ast::SourceEnvelope::Debezium
1852 )
1853 {
1854 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1856 }
1857
1858 let metadata_columns = include_metadata
1859 .into_iter()
1860 .flat_map(|item| match item {
1861 SourceIncludeMetadata::Timestamp { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "timestamp".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Timestamp))
1867 }
1868 SourceIncludeMetadata::Partition { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "partition".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Partition))
1874 }
1875 SourceIncludeMetadata::Offset { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "offset".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Offset))
1881 }
1882 SourceIncludeMetadata::Headers { alias } => {
1883 let name = match alias {
1884 Some(name) => name.to_string(),
1885 None => "headers".to_owned(),
1886 };
1887 Some((name, KafkaMetadataKind::Headers))
1888 }
1889 SourceIncludeMetadata::Header {
1890 alias,
1891 key,
1892 use_bytes,
1893 } => Some((
1894 alias.to_string(),
1895 KafkaMetadataKind::Header {
1896 key: key.clone(),
1897 use_bytes: *use_bytes,
1898 },
1899 )),
1900 SourceIncludeMetadata::Key { .. } => {
1901 None
1903 }
1904 })
1905 .collect();
1906
1907 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1908 }
1909 };
1910
1911 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1912
1913 let (key_desc, value_desc) =
1918 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1919 let columns = match columns {
1920 TableFromSourceColumns::Defined(columns) => columns,
1921 _ => unreachable!(),
1922 };
1923 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1924 (None, desc)
1925 } else {
1926 let key_desc = source_connection.default_key_desc();
1927 let value_desc = source_connection.default_value_desc();
1928 (Some(key_desc), value_desc)
1929 };
1930
1931 let metadata_columns_desc = match &details {
1932 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1933 metadata_columns, ..
1934 }) => kafka_metadata_columns_desc(metadata_columns),
1935 _ => vec![],
1936 };
1937
1938 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1939 scx,
1940 &envelope,
1941 format,
1942 key_desc,
1943 value_desc,
1944 include_metadata,
1945 metadata_columns_desc,
1946 source_connection,
1947 )?;
1948 if let TableFromSourceColumns::Named(col_names) = columns {
1949 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1950 }
1951
1952 let names: Vec<_> = desc.iter_names().cloned().collect();
1953 if let Some(dup) = names.iter().duplicates().next() {
1954 sql_bail!("column {} specified more than once", dup.quoted());
1955 }
1956
1957 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1958
1959 let timeline = match envelope {
1962 SourceEnvelope::CdcV2 => {
1963 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1964 }
1965 _ => Timeline::EpochMilliseconds,
1966 };
1967
1968 if let Some(partition_by) = partition_by {
1969 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1970 check_partition_by(&desc, partition_by)?;
1971 }
1972
1973 let data_source = DataSourceDesc::IngestionExport {
1974 ingestion_id,
1975 external_reference: external_reference
1976 .as_ref()
1977 .expect("populated in purification")
1978 .clone(),
1979 details,
1980 data_config: SourceExportDataConfig { envelope, encoding },
1981 };
1982
1983 let if_not_exists = *if_not_exists;
1984
1985 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1986
1987 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1988 let table = Table {
1989 create_sql,
1990 desc: VersionedRelationDesc::new(desc),
1991 temporary: false,
1992 compaction_window,
1993 data_source: TableDataSource::DataSource {
1994 desc: data_source,
1995 timeline,
1996 },
1997 };
1998
1999 Ok(Plan::CreateTable(CreateTablePlan {
2000 name,
2001 table,
2002 if_not_exists,
2003 }))
2004}
2005
2006generate_extracted_config!(
2007 LoadGeneratorOption,
2008 (TickInterval, Duration),
2009 (AsOf, u64, Default(0_u64)),
2010 (UpTo, u64, Default(u64::MAX)),
2011 (ScaleFactor, f64),
2012 (MaxCardinality, u64),
2013 (Keys, u64),
2014 (SnapshotRounds, u64),
2015 (TransactionalSnapshot, bool),
2016 (ValueSize, u64),
2017 (Seed, u64),
2018 (Partitions, u64),
2019 (BatchSize, u64)
2020);
2021
2022impl LoadGeneratorOptionExtracted {
2023 pub(super) fn ensure_only_valid_options(
2024 &self,
2025 loadgen: &ast::LoadGenerator,
2026 ) -> Result<(), PlanError> {
2027 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2028
2029 let mut options = self.seen.clone();
2030
2031 let permitted_options: &[_] = match loadgen {
2032 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2033 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2034 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2035 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2036 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2037 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2038 ast::LoadGenerator::KeyValue => &[
2039 TickInterval,
2040 Keys,
2041 SnapshotRounds,
2042 TransactionalSnapshot,
2043 ValueSize,
2044 Seed,
2045 Partitions,
2046 BatchSize,
2047 ],
2048 };
2049
2050 for o in permitted_options {
2051 options.remove(o);
2052 }
2053
2054 if !options.is_empty() {
2055 sql_bail!(
2056 "{} load generators do not support {} values",
2057 loadgen,
2058 options.iter().join(", ")
2059 )
2060 }
2061
2062 Ok(())
2063 }
2064}
2065
2066pub(crate) fn load_generator_ast_to_generator(
2067 scx: &StatementContext,
2068 loadgen: &ast::LoadGenerator,
2069 options: &[LoadGeneratorOption<Aug>],
2070 include_metadata: &[SourceIncludeMetadata],
2071) -> Result<LoadGenerator, PlanError> {
2072 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2073 extracted.ensure_only_valid_options(loadgen)?;
2074
2075 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2076 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2077 }
2078
2079 let load_generator = match loadgen {
2080 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2081 ast::LoadGenerator::Clock => {
2082 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2083 LoadGenerator::Clock
2084 }
2085 ast::LoadGenerator::Counter => {
2086 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2087 let LoadGeneratorOptionExtracted {
2088 max_cardinality, ..
2089 } = extracted;
2090 LoadGenerator::Counter { max_cardinality }
2091 }
2092 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2093 ast::LoadGenerator::Datums => {
2094 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2095 LoadGenerator::Datums
2096 }
2097 ast::LoadGenerator::Tpch => {
2098 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2099
2100 let sf: f64 = scale_factor.unwrap_or(0.01);
2102 if !sf.is_finite() || sf < 0.0 {
2103 sql_bail!("unsupported scale factor {sf}");
2104 }
2105
2106 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2107 let total = (sf * multiplier).floor();
2108 let mut i = i64::try_cast_from(total)
2109 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2110 if i < 1 {
2111 i = 1;
2112 }
2113 Ok(i)
2114 };
2115
2116 let count_supplier = f_to_i(10_000f64)?;
2119 let count_part = f_to_i(200_000f64)?;
2120 let count_customer = f_to_i(150_000f64)?;
2121 let count_orders = f_to_i(150_000f64 * 10f64)?;
2122 let count_clerk = f_to_i(1_000f64)?;
2123
2124 LoadGenerator::Tpch {
2125 count_supplier,
2126 count_part,
2127 count_customer,
2128 count_orders,
2129 count_clerk,
2130 }
2131 }
2132 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2133 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2134 let LoadGeneratorOptionExtracted {
2135 keys,
2136 snapshot_rounds,
2137 transactional_snapshot,
2138 value_size,
2139 tick_interval,
2140 seed,
2141 partitions,
2142 batch_size,
2143 ..
2144 } = extracted;
2145
2146 let mut include_offset = None;
2147 for im in include_metadata {
2148 match im {
2149 SourceIncludeMetadata::Offset { alias } => {
2150 include_offset = match alias {
2151 Some(alias) => Some(alias.to_string()),
2152 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2153 }
2154 }
2155 SourceIncludeMetadata::Key { .. } => continue,
2156
2157 _ => {
2158 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2159 }
2160 };
2161 }
2162
2163 let lgkv = KeyValueLoadGenerator {
2164 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2165 snapshot_rounds: snapshot_rounds
2166 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2167 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2169 value_size: value_size
2170 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2171 partitions: partitions
2172 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2173 tick_interval,
2174 batch_size: batch_size
2175 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2176 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2177 include_offset,
2178 };
2179
2180 if lgkv.keys == 0
2181 || lgkv.partitions == 0
2182 || lgkv.value_size == 0
2183 || lgkv.batch_size == 0
2184 {
2185 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2186 }
2187
2188 if lgkv.keys % lgkv.partitions != 0 {
2189 sql_bail!("KEYS must be a multiple of PARTITIONS")
2190 }
2191
2192 if lgkv.batch_size > lgkv.keys {
2193 sql_bail!("KEYS must be larger than BATCH SIZE")
2194 }
2195
2196 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2199 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2200 }
2201
2202 if lgkv.snapshot_rounds == 0 {
2203 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2204 }
2205
2206 LoadGenerator::KeyValue(lgkv)
2207 }
2208 };
2209
2210 Ok(load_generator)
2211}
2212
2213fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2214 let before = value_desc.get_by_name(&"before".into());
2215 let (after_idx, after_ty) = value_desc
2216 .get_by_name(&"after".into())
2217 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2218 let before_idx = if let Some((before_idx, before_ty)) = before {
2219 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2220 sql_bail!("'before' column must be of type record");
2221 }
2222 if before_ty != after_ty {
2223 sql_bail!("'before' type differs from 'after' column");
2224 }
2225 Some(before_idx)
2226 } else {
2227 None
2228 };
2229 Ok((before_idx, after_idx))
2230}
2231
2232fn get_encoding(
2233 scx: &StatementContext,
2234 format: &FormatSpecifier<Aug>,
2235 envelope: &ast::SourceEnvelope,
2236) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2237 let encoding = match format {
2238 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2239 FormatSpecifier::KeyValue { key, value } => {
2240 let key = {
2241 let encoding = get_encoding_inner(scx, key)?;
2242 Some(encoding.key.unwrap_or(encoding.value))
2243 };
2244 let value = get_encoding_inner(scx, value)?.value;
2245 SourceDataEncoding { key, value }
2246 }
2247 };
2248
2249 let requires_keyvalue = matches!(
2250 envelope,
2251 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2252 );
2253 let is_keyvalue = encoding.key.is_some();
2254 if requires_keyvalue && !is_keyvalue {
2255 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2256 };
2257
2258 Ok(encoding)
2259}
2260
2261fn source_sink_cluster_config<'a, 'ctx>(
2267 scx: &'a StatementContext<'ctx>,
2268 in_cluster: &mut Option<ResolvedClusterName>,
2269) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2270 let cluster = match in_cluster {
2271 None => {
2272 let cluster = scx.catalog.resolve_cluster(None)?;
2273 *in_cluster = Some(ResolvedClusterName {
2274 id: cluster.id(),
2275 print_name: None,
2276 });
2277 cluster
2278 }
2279 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2280 };
2281
2282 Ok(cluster)
2283}
2284
2285generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2286
2287#[derive(Debug)]
2288pub struct Schema {
2289 pub key_schema: Option<String>,
2290 pub value_schema: String,
2291 pub key_reference_schemas: Vec<String>,
2293 pub value_reference_schemas: Vec<String>,
2295 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2296 pub confluent_wire_format: bool,
2297}
2298
2299fn get_encoding_inner(
2300 scx: &StatementContext,
2301 format: &Format<Aug>,
2302) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2303 let value = match format {
2304 Format::Bytes => DataEncoding::Bytes,
2305 Format::Avro(schema) => {
2306 let Schema {
2307 key_schema,
2308 value_schema,
2309 key_reference_schemas,
2310 value_reference_schemas,
2311 csr_connection,
2312 confluent_wire_format,
2313 } = match schema {
2314 AvroSchema::InlineSchema {
2317 schema: ast::Schema { schema },
2318 with_options,
2319 } => {
2320 let AvroSchemaOptionExtracted {
2321 confluent_wire_format,
2322 ..
2323 } = with_options.clone().try_into()?;
2324
2325 Schema {
2326 key_schema: None,
2327 value_schema: schema.clone(),
2328 key_reference_schemas: vec![],
2329 value_reference_schemas: vec![],
2330 csr_connection: None,
2331 confluent_wire_format,
2332 }
2333 }
2334 AvroSchema::Csr {
2335 csr_connection:
2336 CsrConnectionAvro {
2337 connection,
2338 seed,
2339 key_strategy: _,
2340 value_strategy: _,
2341 },
2342 } => {
2343 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2344 let csr_connection = match item.connection()? {
2345 Connection::Csr(_) => item.id(),
2346 _ => {
2347 sql_bail!(
2348 "{} is not a schema registry connection",
2349 scx.catalog
2350 .resolve_full_name(item.name())
2351 .to_string()
2352 .quoted()
2353 )
2354 }
2355 };
2356
2357 if let Some(seed) = seed {
2358 Schema {
2359 key_schema: seed.key_schema.clone(),
2360 value_schema: seed.value_schema.clone(),
2361 key_reference_schemas: seed.key_reference_schemas.clone(),
2362 value_reference_schemas: seed.value_reference_schemas.clone(),
2363 csr_connection: Some(csr_connection),
2364 confluent_wire_format: true,
2365 }
2366 } else {
2367 unreachable!("CSR seed resolution should already have been called: Avro")
2368 }
2369 }
2370 };
2371
2372 if let Some(key_schema) = key_schema {
2373 return Ok(SourceDataEncoding {
2374 key: Some(DataEncoding::Avro(AvroEncoding {
2375 schema: key_schema,
2376 reference_schemas: key_reference_schemas,
2377 csr_connection: csr_connection.clone(),
2378 confluent_wire_format,
2379 })),
2380 value: DataEncoding::Avro(AvroEncoding {
2381 schema: value_schema,
2382 reference_schemas: value_reference_schemas,
2383 csr_connection,
2384 confluent_wire_format,
2385 }),
2386 });
2387 } else {
2388 DataEncoding::Avro(AvroEncoding {
2389 schema: value_schema,
2390 reference_schemas: value_reference_schemas,
2391 csr_connection,
2392 confluent_wire_format,
2393 })
2394 }
2395 }
2396 Format::Protobuf(schema) => match schema {
2397 ProtobufSchema::Csr {
2398 csr_connection:
2399 CsrConnectionProtobuf {
2400 connection:
2401 CsrConnection {
2402 connection,
2403 options,
2404 },
2405 seed,
2406 },
2407 } => {
2408 if let Some(CsrSeedProtobuf { key, value }) = seed {
2409 let item = scx.get_item_by_resolved_name(connection)?;
2410 let _ = match item.connection()? {
2411 Connection::Csr(connection) => connection,
2412 _ => {
2413 sql_bail!(
2414 "{} is not a schema registry connection",
2415 scx.catalog
2416 .resolve_full_name(item.name())
2417 .to_string()
2418 .quoted()
2419 )
2420 }
2421 };
2422
2423 if !options.is_empty() {
2424 sql_bail!("Protobuf CSR connections do not support any options");
2425 }
2426
2427 let value = DataEncoding::Protobuf(ProtobufEncoding {
2428 descriptors: strconv::parse_bytes(&value.schema)?,
2429 message_name: value.message_name.clone(),
2430 confluent_wire_format: true,
2431 });
2432 if let Some(key) = key {
2433 return Ok(SourceDataEncoding {
2434 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2435 descriptors: strconv::parse_bytes(&key.schema)?,
2436 message_name: key.message_name.clone(),
2437 confluent_wire_format: true,
2438 })),
2439 value,
2440 });
2441 }
2442 value
2443 } else {
2444 unreachable!("CSR seed resolution should already have been called: Proto")
2445 }
2446 }
2447 ProtobufSchema::InlineSchema {
2448 message_name,
2449 schema: ast::Schema { schema },
2450 } => {
2451 let descriptors = strconv::parse_bytes(schema)?;
2452
2453 DataEncoding::Protobuf(ProtobufEncoding {
2454 descriptors,
2455 message_name: message_name.to_owned(),
2456 confluent_wire_format: false,
2457 })
2458 }
2459 },
2460 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2461 regex: mz_repr::adt::regex::Regex::new(regex, false)
2462 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2463 }),
2464 Format::Csv { columns, delimiter } => {
2465 let columns = match columns {
2466 CsvColumns::Header { names } => {
2467 if names.is_empty() {
2468 sql_bail!("[internal error] column spec should get names in purify")
2469 }
2470 ColumnSpec::Header {
2471 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2472 }
2473 }
2474 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2475 };
2476 DataEncoding::Csv(CsvEncoding {
2477 columns,
2478 delimiter: u8::try_from(*delimiter)
2479 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2480 })
2481 }
2482 Format::Json { array: false } => DataEncoding::Json,
2483 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2484 Format::Text => DataEncoding::Text,
2485 };
2486 Ok(SourceDataEncoding { key: None, value })
2487}
2488
2489fn get_key_envelope(
2491 included_items: &[SourceIncludeMetadata],
2492 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2493 key_envelope_no_encoding: bool,
2494) -> Result<KeyEnvelope, PlanError> {
2495 let key_definition = included_items
2496 .iter()
2497 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2498 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2499 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2500 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2501 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2502 (Some(name), _) if key_envelope_no_encoding => {
2503 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2504 }
2505 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2506 (_, None) => {
2507 sql_bail!(
2511 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2512 got bare FORMAT"
2513 );
2514 }
2515 }
2516 } else {
2517 Ok(KeyEnvelope::None)
2518 }
2519}
2520
2521fn get_unnamed_key_envelope(
2524 key: Option<&DataEncoding<ReferencedConnection>>,
2525) -> Result<KeyEnvelope, PlanError> {
2526 let is_composite = match key {
2530 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2531 Some(
2532 DataEncoding::Avro(_)
2533 | DataEncoding::Csv(_)
2534 | DataEncoding::Protobuf(_)
2535 | DataEncoding::Regex { .. },
2536 ) => true,
2537 None => false,
2538 };
2539
2540 if is_composite {
2541 Ok(KeyEnvelope::Flattened)
2542 } else {
2543 Ok(KeyEnvelope::Named("key".to_string()))
2544 }
2545}
2546
2547pub fn describe_create_view(
2548 _: &StatementContext,
2549 _: CreateViewStatement<Aug>,
2550) -> Result<StatementDesc, PlanError> {
2551 Ok(StatementDesc::new(None))
2552}
2553
2554pub fn plan_view(
2555 scx: &StatementContext,
2556 def: &mut ViewDefinition<Aug>,
2557 temporary: bool,
2558) -> Result<(QualifiedItemName, View), PlanError> {
2559 let create_sql = normalize::create_statement(
2560 scx,
2561 Statement::CreateView(CreateViewStatement {
2562 if_exists: IfExistsBehavior::Error,
2563 temporary,
2564 definition: def.clone(),
2565 }),
2566 )?;
2567
2568 let ViewDefinition {
2569 name,
2570 columns,
2571 query,
2572 } = def;
2573
2574 let query::PlannedRootQuery {
2575 expr,
2576 mut desc,
2577 finishing,
2578 scope: _,
2579 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2580 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2586 &finishing,
2587 expr.arity()
2588 ));
2589 if expr.contains_parameters()? {
2590 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2591 }
2592
2593 let dependencies = expr
2594 .depends_on()
2595 .into_iter()
2596 .map(|gid| scx.catalog.resolve_item_id(&gid))
2597 .collect();
2598
2599 let name = if temporary {
2600 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2601 } else {
2602 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2603 };
2604
2605 plan_utils::maybe_rename_columns(
2606 format!("view {}", scx.catalog.resolve_full_name(&name)),
2607 &mut desc,
2608 columns,
2609 )?;
2610 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2611
2612 if let Some(dup) = names.iter().duplicates().next() {
2613 sql_bail!("column {} specified more than once", dup.quoted());
2614 }
2615
2616 let view = View {
2617 create_sql,
2618 expr,
2619 dependencies,
2620 column_names: names,
2621 temporary,
2622 };
2623
2624 Ok((name, view))
2625}
2626
2627pub fn plan_create_view(
2628 scx: &StatementContext,
2629 mut stmt: CreateViewStatement<Aug>,
2630) -> Result<Plan, PlanError> {
2631 let CreateViewStatement {
2632 temporary,
2633 if_exists,
2634 definition,
2635 } = &mut stmt;
2636 let (name, view) = plan_view(scx, definition, *temporary)?;
2637
2638 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2641
2642 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2643 let if_exists = true;
2644 let cascade = false;
2645 let maybe_item_to_drop = plan_drop_item(
2646 scx,
2647 ObjectType::View,
2648 if_exists,
2649 definition.name.clone(),
2650 cascade,
2651 )?;
2652
2653 if let Some(id) = maybe_item_to_drop {
2655 let dependencies = view.expr.depends_on();
2656 let invalid_drop = scx
2657 .get_item(&id)
2658 .global_ids()
2659 .any(|gid| dependencies.contains(&gid));
2660 if invalid_drop {
2661 let item = scx.catalog.get_item(&id);
2662 sql_bail!(
2663 "cannot replace view {0}: depended upon by new {0} definition",
2664 scx.catalog.resolve_full_name(item.name())
2665 );
2666 }
2667
2668 Some(id)
2669 } else {
2670 None
2671 }
2672 } else {
2673 None
2674 };
2675 let drop_ids = replace
2676 .map(|id| {
2677 scx.catalog
2678 .item_dependents(id)
2679 .into_iter()
2680 .map(|id| id.unwrap_item_id())
2681 .collect()
2682 })
2683 .unwrap_or_default();
2684
2685 validate_view_dependencies(scx, &view.dependencies.0)?;
2686
2687 let full_name = scx.catalog.resolve_full_name(&name);
2689 let partial_name = PartialItemName::from(full_name.clone());
2690 if let (Ok(item), IfExistsBehavior::Error, false) = (
2693 scx.catalog.resolve_item_or_type(&partial_name),
2694 *if_exists,
2695 ignore_if_exists_errors,
2696 ) {
2697 return Err(PlanError::ItemAlreadyExists {
2698 name: full_name.to_string(),
2699 item_type: item.item_type(),
2700 });
2701 }
2702
2703 Ok(Plan::CreateView(CreateViewPlan {
2704 name,
2705 view,
2706 replace,
2707 drop_ids,
2708 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2709 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2710 }))
2711}
2712
2713fn validate_view_dependencies(
2715 scx: &StatementContext,
2716 dependencies: &BTreeSet<CatalogItemId>,
2717) -> Result<(), PlanError> {
2718 for id in dependencies {
2719 let item = scx.catalog.get_item(id);
2720 if item.replacement_target().is_some() {
2721 let name = scx.catalog.minimal_qualification(item.name());
2722 return Err(PlanError::InvalidDependency {
2723 name: name.to_string(),
2724 item_type: format!("replacement {}", item.item_type()),
2725 });
2726 }
2727 }
2728
2729 Ok(())
2730}
2731
2732pub fn describe_create_materialized_view(
2733 _: &StatementContext,
2734 _: CreateMaterializedViewStatement<Aug>,
2735) -> Result<StatementDesc, PlanError> {
2736 Ok(StatementDesc::new(None))
2737}
2738
2739pub fn describe_create_continual_task(
2740 _: &StatementContext,
2741 _: CreateContinualTaskStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743 Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_create_network_policy(
2747 _: &StatementContext,
2748 _: CreateNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750 Ok(StatementDesc::new(None))
2751}
2752
2753pub fn describe_alter_network_policy(
2754 _: &StatementContext,
2755 _: AlterNetworkPolicyStatement<Aug>,
2756) -> Result<StatementDesc, PlanError> {
2757 Ok(StatementDesc::new(None))
2758}
2759
2760pub fn plan_create_materialized_view(
2761 scx: &StatementContext,
2762 mut stmt: CreateMaterializedViewStatement<Aug>,
2763) -> Result<Plan, PlanError> {
2764 let cluster_id =
2765 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2766 stmt.in_cluster = Some(ResolvedClusterName {
2767 id: cluster_id,
2768 print_name: None,
2769 });
2770
2771 let target_replica = match &stmt.in_cluster_replica {
2772 Some(replica_name) => {
2773 scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2774
2775 let cluster = scx.catalog.get_cluster(cluster_id);
2776 let replica_id = cluster
2777 .replica_ids()
2778 .get(replica_name.as_str())
2779 .copied()
2780 .ok_or_else(|| {
2781 CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2782 })?;
2783 Some(replica_id)
2784 }
2785 None => None,
2786 };
2787
2788 let create_sql =
2789 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2790
2791 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2792 let name = scx.allocate_qualified_name(partial_name.clone())?;
2793
2794 let query::PlannedRootQuery {
2795 expr,
2796 mut desc,
2797 finishing,
2798 scope: _,
2799 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2800 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2802 &finishing,
2803 expr.arity()
2804 ));
2805 if expr.contains_parameters()? {
2806 return Err(PlanError::ParameterNotAllowed(
2807 "materialized views".to_string(),
2808 ));
2809 }
2810
2811 plan_utils::maybe_rename_columns(
2812 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2813 &mut desc,
2814 &stmt.columns,
2815 )?;
2816 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2817
2818 let MaterializedViewOptionExtracted {
2819 assert_not_null,
2820 partition_by,
2821 retain_history,
2822 refresh,
2823 seen: _,
2824 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2825
2826 if let Some(partition_by) = partition_by {
2827 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2828 check_partition_by(&desc, partition_by)?;
2829 }
2830
2831 let refresh_schedule = {
2832 let mut refresh_schedule = RefreshSchedule::default();
2833 let mut on_commits_seen = 0;
2834 for refresh_option_value in refresh {
2835 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2836 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2837 }
2838 match refresh_option_value {
2839 RefreshOptionValue::OnCommit => {
2840 on_commits_seen += 1;
2841 }
2842 RefreshOptionValue::AtCreation => {
2843 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2844 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2845 }
2846 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2847 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2849 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2850 name: "REFRESH AT",
2851 scope: &Scope::empty(),
2852 relation_type: &SqlRelationType::empty(),
2853 allow_aggregates: false,
2854 allow_subqueries: false,
2855 allow_parameters: false,
2856 allow_windows: false,
2857 };
2858 let hir = plan_expr(ecx, &time)?.cast_to(
2859 ecx,
2860 CastContext::Assignment,
2861 &SqlScalarType::MzTimestamp,
2862 )?;
2863 let timestamp = hir
2865 .into_literal_mz_timestamp()
2866 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2867 refresh_schedule.ats.push(timestamp);
2868 }
2869 RefreshOptionValue::Every(RefreshEveryOptionValue {
2870 interval,
2871 aligned_to,
2872 }) => {
2873 let interval = Interval::try_from_value(Value::Interval(interval))?;
2874 if interval.as_microseconds() <= 0 {
2875 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2876 }
2877 if interval.months != 0 {
2878 sql_bail!("REFRESH interval must not involve units larger than days");
2883 }
2884 let interval = interval.duration()?;
2885 if u64::try_from(interval.as_millis()).is_err() {
2886 sql_bail!("REFRESH interval too large");
2887 }
2888
2889 let mut aligned_to = match aligned_to {
2890 Some(aligned_to) => aligned_to,
2891 None => {
2892 soft_panic_or_log!(
2893 "ALIGNED TO should have been filled in by purification"
2894 );
2895 sql_bail!(
2896 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2897 )
2898 }
2899 };
2900
2901 transform_ast::transform(scx, &mut aligned_to)?;
2903
2904 let ecx = &ExprContext {
2905 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2906 name: "REFRESH EVERY ... ALIGNED TO",
2907 scope: &Scope::empty(),
2908 relation_type: &SqlRelationType::empty(),
2909 allow_aggregates: false,
2910 allow_subqueries: false,
2911 allow_parameters: false,
2912 allow_windows: false,
2913 };
2914 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2915 ecx,
2916 CastContext::Assignment,
2917 &SqlScalarType::MzTimestamp,
2918 )?;
2919 let aligned_to_const = aligned_to_hir
2921 .into_literal_mz_timestamp()
2922 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2923
2924 refresh_schedule.everies.push(RefreshEvery {
2925 interval,
2926 aligned_to: aligned_to_const,
2927 });
2928 }
2929 }
2930 }
2931
2932 if on_commits_seen > 1 {
2933 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2934 }
2935 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2936 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2937 }
2938
2939 if refresh_schedule == RefreshSchedule::default() {
2940 None
2941 } else {
2942 Some(refresh_schedule)
2943 }
2944 };
2945
2946 let as_of = stmt.as_of.map(Timestamp::from);
2947 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2948 let mut non_null_assertions = assert_not_null
2949 .into_iter()
2950 .map(normalize::column_name)
2951 .map(|assertion_name| {
2952 column_names
2953 .iter()
2954 .position(|col| col == &assertion_name)
2955 .ok_or_else(|| {
2956 sql_err!(
2957 "column {} in ASSERT NOT NULL option not found",
2958 assertion_name.quoted()
2959 )
2960 })
2961 })
2962 .collect::<Result<Vec<_>, _>>()?;
2963 non_null_assertions.sort();
2964 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2965 let dup = &column_names[*dup];
2966 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2967 }
2968
2969 if let Some(dup) = column_names.iter().duplicates().next() {
2970 sql_bail!("column {} specified more than once", dup.quoted());
2971 }
2972
2973 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2976 Ok(true) => IfExistsBehavior::Skip,
2977 _ => stmt.if_exists,
2978 };
2979
2980 let mut replace = None;
2981 let mut if_not_exists = false;
2982 match if_exists {
2983 IfExistsBehavior::Replace => {
2984 let if_exists = true;
2985 let cascade = false;
2986 let replace_id = plan_drop_item(
2987 scx,
2988 ObjectType::MaterializedView,
2989 if_exists,
2990 partial_name.clone().into(),
2991 cascade,
2992 )?;
2993
2994 if let Some(id) = replace_id {
2996 let dependencies = expr.depends_on();
2997 let invalid_drop = scx
2998 .get_item(&id)
2999 .global_ids()
3000 .any(|gid| dependencies.contains(&gid));
3001 if invalid_drop {
3002 let item = scx.catalog.get_item(&id);
3003 sql_bail!(
3004 "cannot replace materialized view {0}: depended upon by new {0} definition",
3005 scx.catalog.resolve_full_name(item.name())
3006 );
3007 }
3008 replace = Some(id);
3009 }
3010 }
3011 IfExistsBehavior::Skip => if_not_exists = true,
3012 IfExistsBehavior::Error => (),
3013 }
3014 let drop_ids = replace
3015 .map(|id| {
3016 scx.catalog
3017 .item_dependents(id)
3018 .into_iter()
3019 .map(|id| id.unwrap_item_id())
3020 .collect()
3021 })
3022 .unwrap_or_default();
3023 let mut dependencies: BTreeSet<_> = expr
3024 .depends_on()
3025 .into_iter()
3026 .map(|gid| scx.catalog.resolve_item_id(&gid))
3027 .collect();
3028
3029 let mut replacement_target = None;
3031 if let Some(target_name) = &stmt.replacement_for {
3032 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3033
3034 let target = scx.get_item_by_resolved_name(target_name)?;
3035 if target.item_type() != CatalogItemType::MaterializedView {
3036 return Err(PlanError::InvalidReplacement {
3037 item_type: target.item_type(),
3038 item_name: scx.catalog.minimal_qualification(target.name()),
3039 replacement_type: CatalogItemType::MaterializedView,
3040 replacement_name: partial_name,
3041 });
3042 }
3043 if target.id().is_system() {
3044 sql_bail!(
3045 "cannot replace {} because it is required by the database system",
3046 scx.catalog.minimal_qualification(target.name()),
3047 );
3048 }
3049
3050 for dependent in scx.catalog.item_dependents(target.id()) {
3052 if let ObjectId::Item(id) = dependent
3053 && dependencies.contains(&id)
3054 {
3055 sql_bail!(
3056 "replacement would cause {} to depend on itself",
3057 scx.catalog.minimal_qualification(target.name()),
3058 );
3059 }
3060 }
3061
3062 dependencies.insert(target.id());
3063
3064 for use_id in target.used_by() {
3065 let use_item = scx.get_item(use_id);
3066 if use_item.replacement_target() == Some(target.id()) {
3067 sql_bail!(
3068 "cannot replace {} because it already has a replacement: {}",
3069 scx.catalog.minimal_qualification(target.name()),
3070 scx.catalog.minimal_qualification(use_item.name()),
3071 );
3072 }
3073 }
3074
3075 replacement_target = Some(target.id());
3076 }
3077
3078 validate_view_dependencies(scx, &dependencies)?;
3079
3080 let full_name = scx.catalog.resolve_full_name(&name);
3082 let partial_name = PartialItemName::from(full_name.clone());
3083 if let (IfExistsBehavior::Error, Ok(item)) =
3086 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3087 {
3088 return Err(PlanError::ItemAlreadyExists {
3089 name: full_name.to_string(),
3090 item_type: item.item_type(),
3091 });
3092 }
3093
3094 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3095 name,
3096 materialized_view: MaterializedView {
3097 create_sql,
3098 expr,
3099 dependencies: DependencyIds(dependencies),
3100 column_names,
3101 replacement_target,
3102 cluster_id,
3103 target_replica,
3104 non_null_assertions,
3105 compaction_window,
3106 refresh_schedule,
3107 as_of,
3108 },
3109 replace,
3110 drop_ids,
3111 if_not_exists,
3112 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3113 }))
3114}
3115
3116generate_extracted_config!(
3117 MaterializedViewOption,
3118 (AssertNotNull, Ident, AllowMultiple),
3119 (PartitionBy, Vec<Ident>),
3120 (RetainHistory, OptionalDuration),
3121 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3122);
3123
3124pub fn plan_create_continual_task(
3125 scx: &StatementContext,
3126 mut stmt: CreateContinualTaskStatement<Aug>,
3127) -> Result<Plan, PlanError> {
3128 match &stmt.sugar {
3129 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3130 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3131 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3132 }
3133 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3134 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3135 }
3136 };
3137 let cluster_id = match &stmt.in_cluster {
3138 None => scx.catalog.resolve_cluster(None)?.id(),
3139 Some(in_cluster) => in_cluster.id,
3140 };
3141 stmt.in_cluster = Some(ResolvedClusterName {
3142 id: cluster_id,
3143 print_name: None,
3144 });
3145
3146 let create_sql =
3147 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3148
3149 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3150
3151 let mut desc = match stmt.columns {
3156 None => None,
3157 Some(columns) => {
3158 let mut desc_columns = Vec::with_capacity(columns.capacity());
3159 for col in columns.iter() {
3160 desc_columns.push((
3161 normalize::column_name(col.name.clone()),
3162 SqlColumnType {
3163 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3164 nullable: false,
3165 },
3166 ));
3167 }
3168 Some(RelationDesc::from_names_and_types(desc_columns))
3169 }
3170 };
3171 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3172 match input.item_type() {
3173 CatalogItemType::ContinualTask
3176 | CatalogItemType::Table
3177 | CatalogItemType::MaterializedView
3178 | CatalogItemType::Source => {}
3179 CatalogItemType::Sink
3180 | CatalogItemType::View
3181 | CatalogItemType::Index
3182 | CatalogItemType::Type
3183 | CatalogItemType::Func
3184 | CatalogItemType::Secret
3185 | CatalogItemType::Connection => {
3186 sql_bail!(
3187 "CONTINUAL TASK cannot use {} as an input",
3188 input.item_type()
3189 );
3190 }
3191 }
3192
3193 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3194 let ct_name = stmt.name;
3195 let placeholder_id = match &ct_name {
3196 ResolvedItemName::ContinualTask { id, name } => {
3197 let desc = match desc.as_ref().cloned() {
3198 Some(x) => x,
3199 None => {
3200 let desc = input.relation_desc().expect("item type checked above");
3205 desc.into_owned()
3206 }
3207 };
3208 qcx.ctes.insert(
3209 *id,
3210 CteDesc {
3211 name: name.item.clone(),
3212 desc,
3213 },
3214 );
3215 Some(*id)
3216 }
3217 _ => None,
3218 };
3219
3220 let mut exprs = Vec::new();
3221 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3222 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3223 let query::PlannedRootQuery {
3224 expr,
3225 desc: desc_query,
3226 finishing,
3227 scope: _,
3228 } = query::plan_ct_query(&mut qcx, query)?;
3229 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3232 &finishing,
3233 expr.arity()
3234 ));
3235 if expr.contains_parameters()? {
3236 if expr.contains_parameters()? {
3237 return Err(PlanError::ParameterNotAllowed(
3238 "continual tasks".to_string(),
3239 ));
3240 }
3241 }
3242 let expr = match desc.as_mut() {
3243 None => {
3244 desc = Some(desc_query);
3245 expr
3246 }
3247 Some(desc) => {
3248 if desc_query.arity() > desc.arity() {
3251 sql_bail!(
3252 "statement {}: INSERT has more expressions than target columns",
3253 idx
3254 );
3255 }
3256 if desc_query.arity() < desc.arity() {
3257 sql_bail!(
3258 "statement {}: INSERT has more target columns than expressions",
3259 idx
3260 );
3261 }
3262 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3265 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3266 let expr = expr.map_err(|e| {
3267 sql_err!(
3268 "statement {}: column {} is of type {} but expression is of type {}",
3269 idx,
3270 desc.get_name(e.column).quoted(),
3271 qcx.humanize_sql_scalar_type(&e.target_type, false),
3272 qcx.humanize_sql_scalar_type(&e.source_type, false),
3273 )
3274 })?;
3275
3276 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3279 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3280 if updated {
3281 let new_types = zip_types().map(|(ct, q)| {
3282 let mut ct = ct.clone();
3283 if q.nullable {
3284 ct.nullable = true;
3285 }
3286 ct
3287 });
3288 *desc = RelationDesc::from_names_and_types(
3289 desc.iter_names().cloned().zip_eq(new_types),
3290 );
3291 }
3292
3293 expr
3294 }
3295 };
3296 match stmt {
3297 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3298 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3299 }
3300 }
3301 let expr = exprs
3304 .into_iter()
3305 .reduce(|acc, expr| acc.union(expr))
3306 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3307 let dependencies = expr
3308 .depends_on()
3309 .into_iter()
3310 .map(|gid| scx.catalog.resolve_item_id(&gid))
3311 .collect();
3312
3313 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3314 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3315 if let Some(dup) = column_names.iter().duplicates().next() {
3316 sql_bail!("column {} specified more than once", dup.quoted());
3317 }
3318
3319 let name = match &ct_name {
3321 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3322 ResolvedItemName::ContinualTask { name, .. } => {
3323 let name = scx.allocate_qualified_name(name.clone())?;
3324 let full_name = scx.catalog.resolve_full_name(&name);
3325 let partial_name = PartialItemName::from(full_name.clone());
3326 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3329 return Err(PlanError::ItemAlreadyExists {
3330 name: full_name.to_string(),
3331 item_type: item.item_type(),
3332 });
3333 }
3334 name
3335 }
3336 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3337 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3338 };
3339
3340 let as_of = stmt.as_of.map(Timestamp::from);
3341 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3342 name,
3343 placeholder_id,
3344 desc,
3345 input_id: input.global_id(),
3346 with_snapshot: snapshot.unwrap_or(true),
3347 continual_task: MaterializedView {
3348 create_sql,
3349 expr,
3350 dependencies,
3351 column_names,
3352 replacement_target: None,
3353 cluster_id,
3354 target_replica: None,
3355 non_null_assertions: Vec::new(),
3356 compaction_window: None,
3357 refresh_schedule: None,
3358 as_of,
3359 },
3360 }))
3361}
3362
3363fn continual_task_query<'a>(
3364 ct_name: &ResolvedItemName,
3365 stmt: &'a ast::ContinualTaskStmt<Aug>,
3366) -> Option<ast::Query<Aug>> {
3367 match stmt {
3368 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3369 table_name: _,
3370 columns,
3371 source,
3372 returning,
3373 }) => {
3374 if !columns.is_empty() || !returning.is_empty() {
3375 return None;
3376 }
3377 match source {
3378 ast::InsertSource::Query(query) => Some(query.clone()),
3379 ast::InsertSource::DefaultValues => None,
3380 }
3381 }
3382 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3383 table_name: _,
3384 alias,
3385 using,
3386 selection,
3387 }) => {
3388 if !using.is_empty() {
3389 return None;
3390 }
3391 let from = ast::TableWithJoins {
3393 relation: ast::TableFactor::Table {
3394 name: ct_name.clone(),
3395 alias: alias.clone(),
3396 },
3397 joins: Vec::new(),
3398 };
3399 let select = ast::Select {
3400 from: vec![from],
3401 selection: selection.clone(),
3402 distinct: None,
3403 projection: vec![ast::SelectItem::Wildcard],
3404 group_by: Vec::new(),
3405 having: None,
3406 qualify: None,
3407 options: Vec::new(),
3408 };
3409 let query = ast::Query {
3410 ctes: ast::CteBlock::Simple(Vec::new()),
3411 body: ast::SetExpr::Select(Box::new(select)),
3412 order_by: Vec::new(),
3413 limit: None,
3414 offset: None,
3415 };
3416 Some(query)
3418 }
3419 }
3420}
3421
3422generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3423
3424pub fn describe_create_sink(
3425 _: &StatementContext,
3426 _: CreateSinkStatement<Aug>,
3427) -> Result<StatementDesc, PlanError> {
3428 Ok(StatementDesc::new(None))
3429}
3430
3431generate_extracted_config!(
3432 CreateSinkOption,
3433 (Snapshot, bool),
3434 (PartitionStrategy, String),
3435 (Version, u64),
3436 (CommitInterval, Duration)
3437);
3438
3439pub fn plan_create_sink(
3440 scx: &StatementContext,
3441 stmt: CreateSinkStatement<Aug>,
3442) -> Result<Plan, PlanError> {
3443 let Some(name) = stmt.name.clone() else {
3445 return Err(PlanError::MissingName(CatalogItemType::Sink));
3446 };
3447 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3448 let full_name = scx.catalog.resolve_full_name(&name);
3449 let partial_name = PartialItemName::from(full_name.clone());
3450 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3451 return Err(PlanError::ItemAlreadyExists {
3452 name: full_name.to_string(),
3453 item_type: item.item_type(),
3454 });
3455 }
3456
3457 plan_sink(scx, stmt)
3458}
3459
3460fn plan_sink(
3465 scx: &StatementContext,
3466 mut stmt: CreateSinkStatement<Aug>,
3467) -> Result<Plan, PlanError> {
3468 let CreateSinkStatement {
3469 name,
3470 in_cluster: _,
3471 from,
3472 connection,
3473 format,
3474 envelope,
3475 mode,
3476 if_not_exists,
3477 with_options,
3478 } = stmt.clone();
3479
3480 let Some(name) = name else {
3481 return Err(PlanError::MissingName(CatalogItemType::Sink));
3482 };
3483 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3484
3485 let envelope = match (&connection, envelope, mode) {
3486 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3488 SinkEnvelope::Upsert
3489 }
3490 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3491 SinkEnvelope::Debezium
3492 }
3493 (CreateSinkConnection::Kafka { .. }, None, None) => {
3494 sql_bail!("ENVELOPE clause is required")
3495 }
3496 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3497 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3498 }
3499 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3501 SinkEnvelope::Upsert
3502 }
3503 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3504 SinkEnvelope::Append
3505 }
3506 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3507 sql_bail!("MODE clause is required")
3508 }
3509 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3510 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3511 }
3512 };
3513
3514 let from_name = &from;
3515 let from = scx.get_item_by_resolved_name(&from)?;
3516
3517 {
3518 use CatalogItemType::*;
3519 match from.item_type() {
3520 Table | Source | MaterializedView | ContinualTask => {
3521 if from.replacement_target().is_some() {
3522 let name = scx.catalog.minimal_qualification(from.name());
3523 return Err(PlanError::InvalidSinkFrom {
3524 name: name.to_string(),
3525 item_type: format!("replacement {}", from.item_type()),
3526 });
3527 }
3528 }
3529 Sink | View | Index | Type | Func | Secret | Connection => {
3530 let name = scx.catalog.minimal_qualification(from.name());
3531 return Err(PlanError::InvalidSinkFrom {
3532 name: name.to_string(),
3533 item_type: from.item_type().to_string(),
3534 });
3535 }
3536 }
3537 }
3538
3539 if from.id().is_system() {
3540 bail_unsupported!("creating a sink directly on a catalog object");
3541 }
3542
3543 let desc = from.relation_desc().expect("item type checked above");
3544 let key_indices = match &connection {
3545 CreateSinkConnection::Kafka { key: Some(key), .. }
3546 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3547 let key_columns = key
3548 .key_columns
3549 .clone()
3550 .into_iter()
3551 .map(normalize::column_name)
3552 .collect::<Vec<_>>();
3553 let mut uniq = BTreeSet::new();
3554 for col in key_columns.iter() {
3555 if !uniq.insert(col) {
3556 sql_bail!("duplicate column referenced in KEY: {}", col);
3557 }
3558 }
3559 let indices = key_columns
3560 .iter()
3561 .map(|col| -> anyhow::Result<usize> {
3562 let name_idx =
3563 desc.get_by_name(col)
3564 .map(|(idx, _type)| idx)
3565 .ok_or_else(|| {
3566 sql_err!("column referenced in KEY does not exist: {}", col)
3567 })?;
3568 if desc.get_unambiguous_name(name_idx).is_none() {
3569 sql_err!("column referenced in KEY is ambiguous: {}", col);
3570 }
3571 Ok(name_idx)
3572 })
3573 .collect::<Result<Vec<_>, _>>()?;
3574
3575 if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3578 let cols: Vec<_> = desc.iter().collect();
3579 for &idx in &indices {
3580 let (col_name, col_type) = cols[idx];
3581 let scalar = &col_type.scalar_type;
3582 let is_valid = matches!(
3583 scalar,
3584 SqlScalarType::Bool
3586 | SqlScalarType::Int16
3587 | SqlScalarType::Int32
3588 | SqlScalarType::Int64
3589 | SqlScalarType::UInt16
3590 | SqlScalarType::UInt32
3591 | SqlScalarType::UInt64
3592 | SqlScalarType::Numeric { .. }
3594 | SqlScalarType::Date
3596 | SqlScalarType::Time
3597 | SqlScalarType::Timestamp { .. }
3598 | SqlScalarType::TimestampTz { .. }
3599 | SqlScalarType::Interval
3600 | SqlScalarType::MzTimestamp
3601 | SqlScalarType::String
3603 | SqlScalarType::Char { .. }
3604 | SqlScalarType::VarChar { .. }
3605 | SqlScalarType::PgLegacyChar
3606 | SqlScalarType::PgLegacyName
3607 | SqlScalarType::Bytes
3608 | SqlScalarType::Jsonb
3609 | SqlScalarType::Uuid
3611 | SqlScalarType::Oid
3612 | SqlScalarType::RegProc
3613 | SqlScalarType::RegType
3614 | SqlScalarType::RegClass
3615 | SqlScalarType::MzAclItem
3616 | SqlScalarType::AclItem
3617 | SqlScalarType::Int2Vector
3618 | SqlScalarType::Range { .. }
3620 );
3621 if !is_valid {
3622 return Err(PlanError::IcebergSinkUnsupportedKeyType {
3623 column: col_name.to_string(),
3624 column_type: format!("{:?}", scalar),
3625 });
3626 }
3627 }
3628 }
3629
3630 let is_valid_key = desc
3631 .typ()
3632 .keys
3633 .iter()
3634 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3635
3636 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3637 if key.not_enforced {
3638 scx.catalog
3639 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3640 key: key_columns.clone(),
3641 name: name.item.clone(),
3642 })
3643 } else {
3644 return Err(PlanError::UpsertSinkWithInvalidKey {
3645 name: from_name.full_name_str(),
3646 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3647 valid_keys: desc
3648 .typ()
3649 .keys
3650 .iter()
3651 .map(|key| {
3652 key.iter()
3653 .map(|col| desc.get_name(*col).as_str().into())
3654 .collect()
3655 })
3656 .collect(),
3657 });
3658 }
3659 }
3660 Some(indices)
3661 }
3662 CreateSinkConnection::Kafka { key: None, .. }
3663 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3664 };
3665
3666 if key_indices.is_some() && envelope == SinkEnvelope::Append {
3667 sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3668 }
3669
3670 if envelope == SinkEnvelope::Append {
3672 if let CreateSinkConnection::Iceberg { .. } = &connection {
3673 use mz_storage_types::sinks::{
3674 ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3675 };
3676 for (col_name, _) in desc.iter() {
3677 if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3678 || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3679 {
3680 sql_bail!(
3681 "column {} conflicts with the system column that MODE APPEND \
3682 adds to the Iceberg table",
3683 col_name.quoted()
3684 );
3685 }
3686 }
3687 }
3688 }
3689
3690 let headers_index = match &connection {
3691 CreateSinkConnection::Kafka {
3692 headers: Some(headers),
3693 ..
3694 } => {
3695 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3696
3697 match envelope {
3698 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3699 SinkEnvelope::Debezium => {
3700 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3701 }
3702 };
3703
3704 let headers = normalize::column_name(headers.clone());
3705 let (idx, ty) = desc
3706 .get_by_name(&headers)
3707 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3708
3709 if desc.get_unambiguous_name(idx).is_none() {
3710 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3711 }
3712
3713 match &ty.scalar_type {
3714 SqlScalarType::Map { value_type, .. }
3715 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3716 _ => sql_bail!(
3717 "HEADERS column must have type map[text => text] or map[text => bytea]"
3718 ),
3719 }
3720
3721 Some(idx)
3722 }
3723 _ => None,
3724 };
3725
3726 let relation_key_indices = desc.typ().keys.get(0).cloned();
3728
3729 let key_desc_and_indices = key_indices.map(|key_indices| {
3730 let cols = desc
3731 .iter()
3732 .map(|(name, ty)| (name.clone(), ty.clone()))
3733 .collect::<Vec<_>>();
3734 let (names, types): (Vec<_>, Vec<_>) =
3735 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3736 let typ = SqlRelationType::new(types);
3737 (RelationDesc::new(typ, names), key_indices)
3738 });
3739
3740 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3741 return Err(PlanError::UpsertSinkWithoutKey);
3742 }
3743
3744 let CreateSinkOptionExtracted {
3745 snapshot,
3746 version,
3747 partition_strategy: _,
3748 seen: _,
3749 commit_interval,
3750 } = with_options.try_into()?;
3751
3752 let connection_builder = match connection {
3753 CreateSinkConnection::Kafka {
3754 connection,
3755 options,
3756 ..
3757 } => kafka_sink_builder(
3758 scx,
3759 connection,
3760 options,
3761 format,
3762 relation_key_indices,
3763 key_desc_and_indices,
3764 headers_index,
3765 desc.into_owned(),
3766 envelope,
3767 from.id(),
3768 commit_interval,
3769 )?,
3770 CreateSinkConnection::Iceberg {
3771 connection,
3772 aws_connection,
3773 options,
3774 ..
3775 } => iceberg_sink_builder(
3776 scx,
3777 connection,
3778 aws_connection,
3779 options,
3780 relation_key_indices,
3781 key_desc_and_indices,
3782 commit_interval,
3783 )?,
3784 };
3785
3786 let with_snapshot = snapshot.unwrap_or(true);
3788 let version = version.unwrap_or(0);
3790
3791 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3795 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3796
3797 Ok(Plan::CreateSink(CreateSinkPlan {
3798 name,
3799 sink: Sink {
3800 create_sql,
3801 from: from.global_id(),
3802 connection: connection_builder,
3803 envelope,
3804 version,
3805 commit_interval,
3806 },
3807 with_snapshot,
3808 if_not_exists,
3809 in_cluster: in_cluster.id(),
3810 }))
3811}
3812
3813fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3814 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3815
3816 let existing_keys = desc
3817 .typ()
3818 .keys
3819 .iter()
3820 .map(|key_columns| {
3821 key_columns
3822 .iter()
3823 .map(|col| desc.get_name(*col).as_str())
3824 .join(", ")
3825 })
3826 .join(", ");
3827
3828 sql_err!(
3829 "Key constraint ({}) conflicts with existing key ({})",
3830 user_keys,
3831 existing_keys
3832 )
3833}
3834
3835#[derive(Debug, Default, PartialEq, Clone)]
3838pub struct CsrConfigOptionExtracted {
3839 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3840 pub(crate) avro_key_fullname: Option<String>,
3841 pub(crate) avro_value_fullname: Option<String>,
3842 pub(crate) null_defaults: bool,
3843 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3844 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3845 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3846 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3847}
3848
3849impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3850 type Error = crate::plan::PlanError;
3851 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3852 let mut extracted = CsrConfigOptionExtracted::default();
3853 let mut common_doc_comments = BTreeMap::new();
3854 for option in v {
3855 if !extracted.seen.insert(option.name.clone()) {
3856 return Err(PlanError::Unstructured({
3857 format!("{} specified more than once", option.name)
3858 }));
3859 }
3860 let option_name = option.name.clone();
3861 let option_name_str = option_name.to_ast_string_simple();
3862 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3863 option_name: option_name.to_ast_string_simple(),
3864 err: e.into(),
3865 };
3866 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3867 val.map(|s| match s {
3868 WithOptionValue::Value(Value::String(s)) => {
3869 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3870 }
3871 _ => Err("must be a string".to_string()),
3872 })
3873 .transpose()
3874 .map_err(PlanError::Unstructured)
3875 .map_err(better_error)
3876 };
3877 match option.name {
3878 CsrConfigOptionName::AvroKeyFullname => {
3879 extracted.avro_key_fullname =
3880 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3881 }
3882 CsrConfigOptionName::AvroValueFullname => {
3883 extracted.avro_value_fullname =
3884 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3885 }
3886 CsrConfigOptionName::NullDefaults => {
3887 extracted.null_defaults =
3888 <bool>::try_from_value(option.value).map_err(better_error)?;
3889 }
3890 CsrConfigOptionName::AvroDocOn(doc_on) => {
3891 let value = String::try_from_value(option.value.ok_or_else(|| {
3892 PlanError::InvalidOptionValue {
3893 option_name: option_name_str,
3894 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3895 }
3896 })?)
3897 .map_err(better_error)?;
3898 let key = match doc_on.identifier {
3899 DocOnIdentifier::Column(ast::ColumnName {
3900 relation: ResolvedItemName::Item { id, .. },
3901 column: ResolvedColumnReference::Column { name, index: _ },
3902 }) => DocTarget::Field {
3903 object_id: id,
3904 column_name: name,
3905 },
3906 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3907 DocTarget::Type(id)
3908 }
3909 _ => unreachable!(),
3910 };
3911
3912 match doc_on.for_schema {
3913 DocOnSchema::KeyOnly => {
3914 extracted.key_doc_options.insert(key, value);
3915 }
3916 DocOnSchema::ValueOnly => {
3917 extracted.value_doc_options.insert(key, value);
3918 }
3919 DocOnSchema::All => {
3920 common_doc_comments.insert(key, value);
3921 }
3922 }
3923 }
3924 CsrConfigOptionName::KeyCompatibilityLevel => {
3925 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3926 }
3927 CsrConfigOptionName::ValueCompatibilityLevel => {
3928 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3929 }
3930 }
3931 }
3932
3933 for (key, value) in common_doc_comments {
3934 if !extracted.key_doc_options.contains_key(&key) {
3935 extracted.key_doc_options.insert(key.clone(), value.clone());
3936 }
3937 if !extracted.value_doc_options.contains_key(&key) {
3938 extracted.value_doc_options.insert(key, value);
3939 }
3940 }
3941 Ok(extracted)
3942 }
3943}
3944
3945fn iceberg_sink_builder(
3946 scx: &StatementContext,
3947 catalog_connection: ResolvedItemName,
3948 aws_connection: ResolvedItemName,
3949 options: Vec<IcebergSinkConfigOption<Aug>>,
3950 relation_key_indices: Option<Vec<usize>>,
3951 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3952 commit_interval: Option<Duration>,
3953) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3954 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3955 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3956 let catalog_connection_id = catalog_connection_item.id();
3957 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3958 let aws_connection_id = aws_connection_item.id();
3959 if !matches!(
3960 catalog_connection_item.connection()?,
3961 Connection::IcebergCatalog(_)
3962 ) {
3963 sql_bail!(
3964 "{} is not an iceberg catalog connection",
3965 scx.catalog
3966 .resolve_full_name(catalog_connection_item.name())
3967 .to_string()
3968 .quoted()
3969 );
3970 };
3971
3972 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3973 sql_bail!(
3974 "{} is not an AWS connection",
3975 scx.catalog
3976 .resolve_full_name(aws_connection_item.name())
3977 .to_string()
3978 .quoted()
3979 );
3980 }
3981
3982 let IcebergSinkConfigOptionExtracted {
3983 table,
3984 namespace,
3985 seen: _,
3986 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3987
3988 let Some(table) = table else {
3989 sql_bail!("Iceberg sink must specify TABLE");
3990 };
3991 let Some(namespace) = namespace else {
3992 sql_bail!("Iceberg sink must specify NAMESPACE");
3993 };
3994 if commit_interval.is_none() {
3995 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3996 }
3997
3998 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3999 catalog_connection_id,
4000 catalog_connection: catalog_connection_id,
4001 aws_connection_id,
4002 aws_connection: aws_connection_id,
4003 table,
4004 namespace,
4005 relation_key_indices,
4006 key_desc_and_indices,
4007 }))
4008}
4009
4010fn kafka_sink_builder(
4011 scx: &StatementContext,
4012 connection: ResolvedItemName,
4013 options: Vec<KafkaSinkConfigOption<Aug>>,
4014 format: Option<FormatSpecifier<Aug>>,
4015 relation_key_indices: Option<Vec<usize>>,
4016 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
4017 headers_index: Option<usize>,
4018 value_desc: RelationDesc,
4019 envelope: SinkEnvelope,
4020 sink_from: CatalogItemId,
4021 commit_interval: Option<Duration>,
4022) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
4023 let connection_item = scx.get_item_by_resolved_name(&connection)?;
4025 let connection_id = connection_item.id();
4026 match connection_item.connection()? {
4027 Connection::Kafka(_) => (),
4028 _ => sql_bail!(
4029 "{} is not a kafka connection",
4030 scx.catalog.resolve_full_name(connection_item.name())
4031 ),
4032 };
4033
4034 if commit_interval.is_some() {
4035 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
4036 }
4037
4038 let KafkaSinkConfigOptionExtracted {
4039 topic,
4040 compression_type,
4041 partition_by,
4042 progress_group_id_prefix,
4043 transactional_id_prefix,
4044 legacy_ids,
4045 topic_config,
4046 topic_metadata_refresh_interval,
4047 topic_partition_count,
4048 topic_replication_factor,
4049 seen: _,
4050 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
4051
4052 let transactional_id = match (transactional_id_prefix, legacy_ids) {
4053 (Some(_), Some(true)) => {
4054 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
4055 }
4056 (None, Some(true)) => KafkaIdStyle::Legacy,
4057 (prefix, _) => KafkaIdStyle::Prefix(prefix),
4058 };
4059
4060 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
4061 (Some(_), Some(true)) => {
4062 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
4063 }
4064 (None, Some(true)) => KafkaIdStyle::Legacy,
4065 (prefix, _) => KafkaIdStyle::Prefix(prefix),
4066 };
4067
4068 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
4069
4070 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
4071 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
4074 }
4075
4076 let assert_positive = |val: Option<i32>, name: &str| {
4077 if let Some(val) = val {
4078 if val <= 0 {
4079 sql_bail!("{} must be a positive integer", name);
4080 }
4081 }
4082 val.map(NonNeg::try_from)
4083 .transpose()
4084 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
4085 };
4086 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
4087 let topic_replication_factor =
4088 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
4089
4090 let gen_avro_schema_options = |conn| {
4093 let CsrConnectionAvro {
4094 connection:
4095 CsrConnection {
4096 connection,
4097 options,
4098 },
4099 seed,
4100 key_strategy,
4101 value_strategy,
4102 } = conn;
4103 if seed.is_some() {
4104 sql_bail!("SEED option does not make sense with sinks");
4105 }
4106 if key_strategy.is_some() {
4107 sql_bail!("KEY STRATEGY option does not make sense with sinks");
4108 }
4109 if value_strategy.is_some() {
4110 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
4111 }
4112
4113 let item = scx.get_item_by_resolved_name(&connection)?;
4114 let csr_connection = match item.connection()? {
4115 Connection::Csr(_) => item.id(),
4116 _ => {
4117 sql_bail!(
4118 "{} is not a schema registry connection",
4119 scx.catalog
4120 .resolve_full_name(item.name())
4121 .to_string()
4122 .quoted()
4123 )
4124 }
4125 };
4126 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4127
4128 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4129 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4130 }
4131
4132 if key_desc_and_indices.is_some()
4133 && (extracted_options.avro_key_fullname.is_some()
4134 ^ extracted_options.avro_value_fullname.is_some())
4135 {
4136 sql_bail!(
4137 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4138 );
4139 }
4140
4141 Ok((csr_connection, extracted_options))
4142 };
4143
4144 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4145 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4146 Format::Bytes if desc.arity() == 1 => {
4147 let col_type = &desc.typ().column_types[0].scalar_type;
4148 if !mz_pgrepr::Value::can_encode_binary(col_type) {
4149 bail_unsupported!(format!(
4150 "BYTES format with non-encodable type: {:?}",
4151 col_type
4152 ));
4153 }
4154
4155 Ok(KafkaSinkFormatType::Bytes)
4156 }
4157 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4158 Format::Bytes | Format::Text => {
4159 bail_unsupported!("BYTES or TEXT format with multiple columns")
4160 }
4161 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4162 Format::Avro(AvroSchema::Csr { csr_connection }) => {
4163 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4164 let schema = if is_key {
4165 AvroSchemaGenerator::new(
4166 desc.clone(),
4167 false,
4168 options.key_doc_options,
4169 options.avro_key_fullname.as_deref().unwrap_or("row"),
4170 options.null_defaults,
4171 Some(sink_from),
4172 false,
4173 )?
4174 .schema()
4175 .to_string()
4176 } else {
4177 AvroSchemaGenerator::new(
4178 desc.clone(),
4179 matches!(envelope, SinkEnvelope::Debezium),
4180 options.value_doc_options,
4181 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4182 options.null_defaults,
4183 Some(sink_from),
4184 true,
4185 )?
4186 .schema()
4187 .to_string()
4188 };
4189 Ok(KafkaSinkFormatType::Avro {
4190 schema,
4191 compatibility_level: if is_key {
4192 options.key_compatibility_level
4193 } else {
4194 options.value_compatibility_level
4195 },
4196 csr_connection,
4197 })
4198 }
4199 format => bail_unsupported!(format!("sink format {:?}", format)),
4200 };
4201
4202 let partition_by = match &partition_by {
4203 Some(partition_by) => {
4204 let mut scope = Scope::from_source(None, value_desc.iter_names());
4205
4206 match envelope {
4207 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
4208 SinkEnvelope::Debezium => {
4209 let key_indices: HashSet<_> = key_desc_and_indices
4210 .as_ref()
4211 .map(|(_desc, indices)| indices.as_slice())
4212 .unwrap_or_default()
4213 .into_iter()
4214 .collect();
4215 for (i, item) in scope.items.iter_mut().enumerate() {
4216 if !key_indices.contains(&i) {
4217 item.error_if_referenced = Some(|_table, column| {
4218 PlanError::InvalidPartitionByEnvelopeDebezium {
4219 column_name: column.to_string(),
4220 }
4221 });
4222 }
4223 }
4224 }
4225 };
4226
4227 let ecx = &ExprContext {
4228 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4229 name: "PARTITION BY",
4230 scope: &scope,
4231 relation_type: value_desc.typ(),
4232 allow_aggregates: false,
4233 allow_subqueries: false,
4234 allow_parameters: false,
4235 allow_windows: false,
4236 };
4237 let expr = plan_expr(ecx, partition_by)?.cast_to(
4238 ecx,
4239 CastContext::Assignment,
4240 &SqlScalarType::UInt64,
4241 )?;
4242 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4243
4244 Some(expr)
4245 }
4246 _ => None,
4247 };
4248
4249 let format = match format {
4251 Some(FormatSpecifier::KeyValue { key, value }) => {
4252 let key_format = match key_desc_and_indices.as_ref() {
4253 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4254 None => None,
4255 };
4256 KafkaSinkFormat {
4257 value_format: map_format(value, &value_desc, false)?,
4258 key_format,
4259 }
4260 }
4261 Some(FormatSpecifier::Bare(format)) => {
4262 let key_format = match key_desc_and_indices.as_ref() {
4263 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4264 None => None,
4265 };
4266 KafkaSinkFormat {
4267 value_format: map_format(format, &value_desc, false)?,
4268 key_format,
4269 }
4270 }
4271 None => bail_unsupported!("sink without format"),
4272 };
4273
4274 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4275 connection_id,
4276 connection: connection_id,
4277 format,
4278 topic: topic_name,
4279 relation_key_indices,
4280 key_desc_and_indices,
4281 headers_index,
4282 value_desc,
4283 partition_by,
4284 compression_type,
4285 progress_group_id,
4286 transactional_id,
4287 topic_options: KafkaTopicOptions {
4288 partition_count: topic_partition_count,
4289 replication_factor: topic_replication_factor,
4290 topic_config: topic_config.unwrap_or_default(),
4291 },
4292 topic_metadata_refresh_interval,
4293 }))
4294}
4295
4296pub fn describe_create_index(
4297 _: &StatementContext,
4298 _: CreateIndexStatement<Aug>,
4299) -> Result<StatementDesc, PlanError> {
4300 Ok(StatementDesc::new(None))
4301}
4302
4303pub fn plan_create_index(
4304 scx: &StatementContext,
4305 mut stmt: CreateIndexStatement<Aug>,
4306) -> Result<Plan, PlanError> {
4307 let CreateIndexStatement {
4308 name,
4309 on_name,
4310 in_cluster,
4311 key_parts,
4312 with_options,
4313 if_not_exists,
4314 } = &mut stmt;
4315 let on = scx.get_item_by_resolved_name(on_name)?;
4316
4317 {
4318 use CatalogItemType::*;
4319 match on.item_type() {
4320 Table | Source | View | MaterializedView | ContinualTask => {
4321 if on.replacement_target().is_some() {
4322 sql_bail!(
4323 "index cannot be created on {} because it is a replacement {}",
4324 on_name.full_name_str(),
4325 on.item_type(),
4326 );
4327 }
4328 }
4329 Sink | Index | Type | Func | Secret | Connection => {
4330 sql_bail!(
4331 "index cannot be created on {} because it is a {}",
4332 on_name.full_name_str(),
4333 on.item_type(),
4334 );
4335 }
4336 }
4337 }
4338
4339 let on_desc = on.relation_desc().expect("item type checked above");
4340
4341 let filled_key_parts = match key_parts {
4342 Some(kp) => kp.to_vec(),
4343 None => {
4344 let mut name_counts = BTreeMap::new();
4349 for name in on_desc.iter_names() {
4350 *name_counts.entry(name).or_insert(0usize) += 1;
4351 }
4352 let key = on_desc.typ().default_key();
4353 key.iter()
4354 .map(|i| {
4355 let name = on_desc.get_name(*i);
4356 if name_counts.get(name).copied() == Some(1) {
4357 Expr::Identifier(vec![name.clone().into()])
4358 } else {
4359 Expr::Value(Value::Number((i + 1).to_string()))
4360 }
4361 })
4362 .collect()
4363 }
4364 };
4365 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4366
4367 let index_name = if let Some(name) = name {
4368 QualifiedItemName {
4369 qualifiers: on.name().qualifiers.clone(),
4370 item: normalize::ident(name.clone()),
4371 }
4372 } else {
4373 let mut idx_name = QualifiedItemName {
4374 qualifiers: on.name().qualifiers.clone(),
4375 item: on.name().item.clone(),
4376 };
4377 if key_parts.is_none() {
4378 idx_name.item += "_primary_idx";
4380 } else {
4381 let index_name_col_suffix = keys
4384 .iter()
4385 .map(|k| match k {
4386 mz_expr::MirScalarExpr::Column(i, name) => {
4387 match (on_desc.get_unambiguous_name(*i), &name.0) {
4388 (Some(col_name), _) => col_name.to_string(),
4389 (None, Some(name)) => name.to_string(),
4390 (None, None) => format!("{}", i + 1),
4391 }
4392 }
4393 _ => "expr".to_string(),
4394 })
4395 .join("_");
4396 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4397 .expect("write on strings cannot fail");
4398 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4399 }
4400
4401 if !*if_not_exists {
4402 scx.catalog.find_available_name(idx_name)
4403 } else {
4404 idx_name
4405 }
4406 };
4407
4408 let full_name = scx.catalog.resolve_full_name(&index_name);
4410 let partial_name = PartialItemName::from(full_name.clone());
4411 if let (Ok(item), false, false) = (
4419 scx.catalog.resolve_item_or_type(&partial_name),
4420 *if_not_exists,
4421 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4422 ) {
4423 return Err(PlanError::ItemAlreadyExists {
4424 name: full_name.to_string(),
4425 item_type: item.item_type(),
4426 });
4427 }
4428
4429 let options = plan_index_options(scx, with_options.clone())?;
4430 let cluster_id = match in_cluster {
4431 None => scx.resolve_cluster(None)?.id(),
4432 Some(in_cluster) => in_cluster.id,
4433 };
4434
4435 *in_cluster = Some(ResolvedClusterName {
4436 id: cluster_id,
4437 print_name: None,
4438 });
4439
4440 *name = Some(Ident::new(index_name.item.clone())?);
4442 *key_parts = Some(filled_key_parts);
4443 let if_not_exists = *if_not_exists;
4444
4445 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4446 let compaction_window = options.iter().find_map(|o| {
4447 #[allow(irrefutable_let_patterns)]
4448 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4449 Some(lcw.clone())
4450 } else {
4451 None
4452 }
4453 });
4454
4455 Ok(Plan::CreateIndex(CreateIndexPlan {
4456 name: index_name,
4457 index: Index {
4458 create_sql,
4459 on: on.global_id(),
4460 keys,
4461 cluster_id,
4462 compaction_window,
4463 },
4464 if_not_exists,
4465 }))
4466}
4467
4468pub fn describe_create_type(
4469 _: &StatementContext,
4470 _: CreateTypeStatement<Aug>,
4471) -> Result<StatementDesc, PlanError> {
4472 Ok(StatementDesc::new(None))
4473}
4474
4475pub fn plan_create_type(
4476 scx: &StatementContext,
4477 stmt: CreateTypeStatement<Aug>,
4478) -> Result<Plan, PlanError> {
4479 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4480 let CreateTypeStatement { name, as_type, .. } = stmt;
4481
4482 fn validate_data_type(
4483 scx: &StatementContext,
4484 data_type: ResolvedDataType,
4485 as_type: &str,
4486 key: &str,
4487 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4488 let (id, modifiers) = match data_type {
4489 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4490 _ => sql_bail!(
4491 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4492 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4493 as_type,
4494 key,
4495 data_type.human_readable_name(),
4496 ),
4497 };
4498
4499 let item = scx.catalog.get_item(&id);
4500 match item.type_details() {
4501 None => sql_bail!(
4502 "{} must be of class type, but received {} which is of class {}",
4503 key,
4504 scx.catalog.resolve_full_name(item.name()),
4505 item.item_type()
4506 ),
4507 Some(CatalogTypeDetails {
4508 typ: CatalogType::Char,
4509 ..
4510 }) => {
4511 bail_unsupported!("embedding char type in a list or map")
4512 }
4513 _ => {
4514 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4516
4517 Ok((id, modifiers))
4518 }
4519 }
4520 }
4521
4522 let inner = match as_type {
4523 CreateTypeAs::List { options } => {
4524 let CreateTypeListOptionExtracted {
4525 element_type,
4526 seen: _,
4527 } = CreateTypeListOptionExtracted::try_from(options)?;
4528 let element_type =
4529 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4530 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4531 CatalogType::List {
4532 element_reference: id,
4533 element_modifiers: modifiers,
4534 }
4535 }
4536 CreateTypeAs::Map { options } => {
4537 let CreateTypeMapOptionExtracted {
4538 key_type,
4539 value_type,
4540 seen: _,
4541 } = CreateTypeMapOptionExtracted::try_from(options)?;
4542 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4543 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4544 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4545 let (value_id, value_modifiers) =
4546 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4547 CatalogType::Map {
4548 key_reference: key_id,
4549 key_modifiers,
4550 value_reference: value_id,
4551 value_modifiers,
4552 }
4553 }
4554 CreateTypeAs::Record { column_defs } => {
4555 let mut fields = vec![];
4556 for column_def in column_defs {
4557 let data_type = column_def.data_type;
4558 let key = ident(column_def.name.clone());
4559 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4560 fields.push(CatalogRecordField {
4561 name: ColumnName::from(key.clone()),
4562 type_reference: id,
4563 type_modifiers: modifiers,
4564 });
4565 }
4566 CatalogType::Record { fields }
4567 }
4568 };
4569
4570 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4571
4572 let full_name = scx.catalog.resolve_full_name(&name);
4574 let partial_name = PartialItemName::from(full_name.clone());
4575 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4578 if item.item_type().conflicts_with_type() {
4579 return Err(PlanError::ItemAlreadyExists {
4580 name: full_name.to_string(),
4581 item_type: item.item_type(),
4582 });
4583 }
4584 }
4585
4586 Ok(Plan::CreateType(CreateTypePlan {
4587 name,
4588 typ: Type { create_sql, inner },
4589 }))
4590}
4591
4592generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4593
4594generate_extracted_config!(
4595 CreateTypeMapOption,
4596 (KeyType, ResolvedDataType),
4597 (ValueType, ResolvedDataType)
4598);
4599
4600#[derive(Debug)]
4601pub enum PlannedAlterRoleOption {
4602 Attributes(PlannedRoleAttributes),
4603 Variable(PlannedRoleVariable),
4604}
4605
4606#[derive(Debug, Clone)]
4607pub struct PlannedRoleAttributes {
4608 pub inherit: Option<bool>,
4609 pub password: Option<Password>,
4610 pub scram_iterations: Option<NonZeroU32>,
4611 pub nopassword: Option<bool>,
4615 pub superuser: Option<bool>,
4616 pub login: Option<bool>,
4617}
4618
4619fn plan_role_attributes(
4620 options: Vec<RoleAttribute>,
4621 scx: &StatementContext,
4622) -> Result<PlannedRoleAttributes, PlanError> {
4623 let mut planned_attributes = PlannedRoleAttributes {
4624 inherit: None,
4625 password: None,
4626 scram_iterations: None,
4627 superuser: None,
4628 login: None,
4629 nopassword: None,
4630 };
4631
4632 for option in options {
4633 match option {
4634 RoleAttribute::Inherit | RoleAttribute::NoInherit
4635 if planned_attributes.inherit.is_some() =>
4636 {
4637 sql_bail!("conflicting or redundant options");
4638 }
4639 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4640 bail_never_supported!(
4641 "CREATECLUSTER attribute",
4642 "sql/create-role/#details",
4643 "Use system privileges instead."
4644 );
4645 }
4646 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4647 bail_never_supported!(
4648 "CREATEDB attribute",
4649 "sql/create-role/#details",
4650 "Use system privileges instead."
4651 );
4652 }
4653 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4654 bail_never_supported!(
4655 "CREATEROLE attribute",
4656 "sql/create-role/#details",
4657 "Use system privileges instead."
4658 );
4659 }
4660 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4661 sql_bail!("conflicting or redundant options");
4662 }
4663
4664 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4665 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4666 RoleAttribute::Password(password) => {
4667 if let Some(password) = password {
4668 planned_attributes.password = Some(password.into());
4669 planned_attributes.scram_iterations =
4670 Some(scx.catalog.system_vars().scram_iterations())
4671 } else {
4672 planned_attributes.nopassword = Some(true);
4673 }
4674 }
4675 RoleAttribute::SuperUser => {
4676 if planned_attributes.superuser == Some(false) {
4677 sql_bail!("conflicting or redundant options");
4678 }
4679 planned_attributes.superuser = Some(true);
4680 }
4681 RoleAttribute::NoSuperUser => {
4682 if planned_attributes.superuser == Some(true) {
4683 sql_bail!("conflicting or redundant options");
4684 }
4685 planned_attributes.superuser = Some(false);
4686 }
4687 RoleAttribute::Login => {
4688 if planned_attributes.login == Some(false) {
4689 sql_bail!("conflicting or redundant options");
4690 }
4691 planned_attributes.login = Some(true);
4692 }
4693 RoleAttribute::NoLogin => {
4694 if planned_attributes.login == Some(true) {
4695 sql_bail!("conflicting or redundant options");
4696 }
4697 planned_attributes.login = Some(false);
4698 }
4699 }
4700 }
4701 if planned_attributes.inherit == Some(false) {
4702 bail_unsupported!("non inherit roles");
4703 }
4704
4705 Ok(planned_attributes)
4706}
4707
4708#[derive(Debug)]
4709pub enum PlannedRoleVariable {
4710 Set { name: String, value: VariableValue },
4711 Reset { name: String },
4712}
4713
4714impl PlannedRoleVariable {
4715 pub fn name(&self) -> &str {
4716 match self {
4717 PlannedRoleVariable::Set { name, .. } => name,
4718 PlannedRoleVariable::Reset { name } => name,
4719 }
4720 }
4721}
4722
4723fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4724 let plan = match variable {
4725 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4726 name: name.to_string(),
4727 value: scl::plan_set_variable_to(value)?,
4728 },
4729 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4730 name: name.to_string(),
4731 },
4732 };
4733 Ok(plan)
4734}
4735
4736pub fn describe_create_role(
4737 _: &StatementContext,
4738 _: CreateRoleStatement,
4739) -> Result<StatementDesc, PlanError> {
4740 Ok(StatementDesc::new(None))
4741}
4742
4743pub fn plan_create_role(
4744 scx: &StatementContext,
4745 CreateRoleStatement { name, options }: CreateRoleStatement,
4746) -> Result<Plan, PlanError> {
4747 let attributes = plan_role_attributes(options, scx)?;
4748 Ok(Plan::CreateRole(CreateRolePlan {
4749 name: normalize::ident(name),
4750 attributes: attributes.into(),
4751 }))
4752}
4753
4754pub fn plan_create_network_policy(
4755 ctx: &StatementContext,
4756 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4757) -> Result<Plan, PlanError> {
4758 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4759 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4760
4761 let Some(rule_defs) = policy_options.rules else {
4762 sql_bail!("RULES must be specified when creating network policies.");
4763 };
4764
4765 let mut rules = vec![];
4766 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4767 let NetworkPolicyRuleOptionExtracted {
4768 seen: _,
4769 direction,
4770 action,
4771 address,
4772 } = options.try_into()?;
4773 let (direction, action, address) = match (direction, action, address) {
4774 (Some(direction), Some(action), Some(address)) => (
4775 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4776 NetworkPolicyRuleAction::try_from(action.as_str())?,
4777 PolicyAddress::try_from(address.as_str())?,
4778 ),
4779 (_, _, _) => {
4780 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4781 }
4782 };
4783 rules.push(NetworkPolicyRule {
4784 name: normalize::ident(name),
4785 direction,
4786 action,
4787 address,
4788 });
4789 }
4790
4791 if rules.len()
4792 > ctx
4793 .catalog
4794 .system_vars()
4795 .max_rules_per_network_policy()
4796 .try_into()?
4797 {
4798 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4799 }
4800
4801 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4802 name: normalize::ident(name),
4803 rules,
4804 }))
4805}
4806
4807pub fn plan_alter_network_policy(
4808 ctx: &StatementContext,
4809 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4810) -> Result<Plan, PlanError> {
4811 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4812
4813 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4814 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4815
4816 let Some(rule_defs) = policy_options.rules else {
4817 sql_bail!("RULES must be specified when creating network policies.");
4818 };
4819
4820 let mut rules = vec![];
4821 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4822 let NetworkPolicyRuleOptionExtracted {
4823 seen: _,
4824 direction,
4825 action,
4826 address,
4827 } = options.try_into()?;
4828
4829 let (direction, action, address) = match (direction, action, address) {
4830 (Some(direction), Some(action), Some(address)) => (
4831 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4832 NetworkPolicyRuleAction::try_from(action.as_str())?,
4833 PolicyAddress::try_from(address.as_str())?,
4834 ),
4835 (_, _, _) => {
4836 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4837 }
4838 };
4839 rules.push(NetworkPolicyRule {
4840 name: normalize::ident(name),
4841 direction,
4842 action,
4843 address,
4844 });
4845 }
4846 if rules.len()
4847 > ctx
4848 .catalog
4849 .system_vars()
4850 .max_rules_per_network_policy()
4851 .try_into()?
4852 {
4853 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4854 }
4855
4856 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4857 id: policy.id(),
4858 name: normalize::ident(name),
4859 rules,
4860 }))
4861}
4862
4863pub fn describe_create_cluster(
4864 _: &StatementContext,
4865 _: CreateClusterStatement<Aug>,
4866) -> Result<StatementDesc, PlanError> {
4867 Ok(StatementDesc::new(None))
4868}
4869
4870generate_extracted_config!(
4876 ClusterOption,
4877 (AvailabilityZones, Vec<String>),
4878 (Disk, bool),
4879 (IntrospectionDebugging, bool),
4880 (IntrospectionInterval, OptionalDuration),
4881 (Managed, bool),
4882 (Replicas, Vec<ReplicaDefinition<Aug>>),
4883 (ReplicationFactor, u32),
4884 (Size, String),
4885 (Schedule, ClusterScheduleOptionValue),
4886 (WorkloadClass, OptionalString)
4887);
4888
4889generate_extracted_config!(
4890 NetworkPolicyOption,
4891 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4892);
4893
4894generate_extracted_config!(
4895 NetworkPolicyRuleOption,
4896 (Direction, String),
4897 (Action, String),
4898 (Address, String)
4899);
4900
4901generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4902
4903generate_extracted_config!(
4904 ClusterAlterUntilReadyOption,
4905 (Timeout, Duration),
4906 (OnTimeout, String)
4907);
4908
4909generate_extracted_config!(
4910 ClusterFeature,
4911 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4912 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4913 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4914 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4915 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4916 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4917 (
4918 EnableProjectionPushdownAfterRelationCse,
4919 Option<bool>,
4920 Default(None)
4921 )
4922);
4923
4924pub fn plan_create_cluster(
4928 scx: &StatementContext,
4929 stmt: CreateClusterStatement<Aug>,
4930) -> Result<Plan, PlanError> {
4931 let plan = plan_create_cluster_inner(scx, stmt)?;
4932
4933 if let CreateClusterVariant::Managed(_) = &plan.variant {
4935 let stmt = unplan_create_cluster(scx, plan.clone())
4936 .map_err(|e| PlanError::Replan(e.to_string()))?;
4937 let create_sql = stmt.to_ast_string_stable();
4938 let stmt = parse::parse(&create_sql)
4939 .map_err(|e| PlanError::Replan(e.to_string()))?
4940 .into_element()
4941 .ast;
4942 let (stmt, _resolved_ids) =
4943 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4944 let stmt = match stmt {
4945 Statement::CreateCluster(stmt) => stmt,
4946 stmt => {
4947 return Err(PlanError::Replan(format!(
4948 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4949 )));
4950 }
4951 };
4952 let replan =
4953 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4954 if plan != replan {
4955 return Err(PlanError::Replan(format!(
4956 "replan does not match: plan={plan:?}, replan={replan:?}"
4957 )));
4958 }
4959 }
4960
4961 Ok(Plan::CreateCluster(plan))
4962}
4963
4964pub fn plan_create_cluster_inner(
4965 scx: &StatementContext,
4966 CreateClusterStatement {
4967 name,
4968 options,
4969 features,
4970 }: CreateClusterStatement<Aug>,
4971) -> Result<CreateClusterPlan, PlanError> {
4972 let ClusterOptionExtracted {
4973 availability_zones,
4974 introspection_debugging,
4975 introspection_interval,
4976 managed,
4977 replicas,
4978 replication_factor,
4979 seen: _,
4980 size,
4981 disk,
4982 schedule,
4983 workload_class,
4984 }: ClusterOptionExtracted = options.try_into()?;
4985
4986 let managed = managed.unwrap_or_else(|| replicas.is_none());
4987
4988 if !scx.catalog.active_role_id().is_system() {
4989 if !features.is_empty() {
4990 sql_bail!("FEATURES not supported for non-system users");
4991 }
4992 if workload_class.is_some() {
4993 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4994 }
4995 }
4996
4997 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4998 let workload_class = workload_class.and_then(|v| v.0);
4999
5000 if managed {
5001 if replicas.is_some() {
5002 sql_bail!("REPLICAS not supported for managed clusters");
5003 }
5004 let Some(size) = size else {
5005 sql_bail!("SIZE must be specified for managed clusters");
5006 };
5007
5008 if disk.is_some() {
5009 if scx.catalog.is_cluster_size_cc(&size) {
5013 sql_bail!(
5014 "DISK option not supported for modern cluster sizes because disk is always enabled"
5015 );
5016 }
5017
5018 scx.catalog
5019 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5020 }
5021
5022 let compute = plan_compute_replica_config(
5023 introspection_interval,
5024 introspection_debugging.unwrap_or(false),
5025 )?;
5026
5027 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
5028 replication_factor.unwrap_or_else(|| {
5029 scx.catalog
5030 .system_vars()
5031 .default_cluster_replication_factor()
5032 })
5033 } else {
5034 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
5035 if replication_factor.is_some() {
5036 sql_bail!(
5037 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
5038 );
5039 }
5040 0
5044 };
5045 let availability_zones = availability_zones.unwrap_or_default();
5046
5047 if !availability_zones.is_empty() {
5048 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
5049 }
5050
5051 let ClusterFeatureExtracted {
5053 reoptimize_imported_views,
5054 enable_eager_delta_joins,
5055 enable_new_outer_join_lowering,
5056 enable_variadic_left_join_lowering,
5057 enable_letrec_fixpoint_analysis,
5058 enable_join_prioritize_arranged,
5059 enable_projection_pushdown_after_relation_cse,
5060 seen: _,
5061 } = ClusterFeatureExtracted::try_from(features)?;
5062 let optimizer_feature_overrides = OptimizerFeatureOverrides {
5063 reoptimize_imported_views,
5064 enable_eager_delta_joins,
5065 enable_new_outer_join_lowering,
5066 enable_variadic_left_join_lowering,
5067 enable_letrec_fixpoint_analysis,
5068 enable_join_prioritize_arranged,
5069 enable_projection_pushdown_after_relation_cse,
5070 ..Default::default()
5071 };
5072
5073 let schedule = plan_cluster_schedule(schedule)?;
5074
5075 Ok(CreateClusterPlan {
5076 name: normalize::ident(name),
5077 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
5078 replication_factor,
5079 size,
5080 availability_zones,
5081 compute,
5082 optimizer_feature_overrides,
5083 schedule,
5084 }),
5085 workload_class,
5086 })
5087 } else {
5088 let Some(replica_defs) = replicas else {
5089 sql_bail!("REPLICAS must be specified for unmanaged clusters");
5090 };
5091 if availability_zones.is_some() {
5092 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
5093 }
5094 if replication_factor.is_some() {
5095 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
5096 }
5097 if introspection_debugging.is_some() {
5098 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
5099 }
5100 if introspection_interval.is_some() {
5101 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
5102 }
5103 if size.is_some() {
5104 sql_bail!("SIZE not supported for unmanaged clusters");
5105 }
5106 if disk.is_some() {
5107 sql_bail!("DISK not supported for unmanaged clusters");
5108 }
5109 if !features.is_empty() {
5110 sql_bail!("FEATURES not supported for unmanaged clusters");
5111 }
5112 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
5113 sql_bail!(
5114 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
5115 );
5116 }
5117
5118 let mut replicas = vec![];
5119 for ReplicaDefinition { name, options } in replica_defs {
5120 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
5121 }
5122
5123 Ok(CreateClusterPlan {
5124 name: normalize::ident(name),
5125 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
5126 workload_class,
5127 })
5128 }
5129}
5130
5131pub fn unplan_create_cluster(
5135 scx: &StatementContext,
5136 CreateClusterPlan {
5137 name,
5138 variant,
5139 workload_class,
5140 }: CreateClusterPlan,
5141) -> Result<CreateClusterStatement<Aug>, PlanError> {
5142 match variant {
5143 CreateClusterVariant::Managed(CreateClusterManagedPlan {
5144 replication_factor,
5145 size,
5146 availability_zones,
5147 compute,
5148 optimizer_feature_overrides,
5149 schedule,
5150 }) => {
5151 let schedule = unplan_cluster_schedule(schedule);
5152 let OptimizerFeatureOverrides {
5153 enable_guard_subquery_tablefunc: _,
5154 enable_consolidate_after_union_negate: _,
5155 enable_reduce_mfp_fusion: _,
5156 enable_cardinality_estimates: _,
5157 persist_fast_path_limit: _,
5158 reoptimize_imported_views,
5159 enable_eager_delta_joins,
5160 enable_new_outer_join_lowering,
5161 enable_variadic_left_join_lowering,
5162 enable_letrec_fixpoint_analysis,
5163 enable_join_prioritize_arranged,
5164 enable_projection_pushdown_after_relation_cse,
5165 enable_less_reduce_in_eqprop: _,
5166 enable_dequadratic_eqprop_map: _,
5167 enable_eq_classes_withholding_errors: _,
5168 enable_fast_path_plan_insights: _,
5169 enable_cast_elimination: _,
5170 enable_case_literal_transform: _,
5171 } = optimizer_feature_overrides;
5172 let features_extracted = ClusterFeatureExtracted {
5174 seen: Default::default(),
5176 reoptimize_imported_views,
5177 enable_eager_delta_joins,
5178 enable_new_outer_join_lowering,
5179 enable_variadic_left_join_lowering,
5180 enable_letrec_fixpoint_analysis,
5181 enable_join_prioritize_arranged,
5182 enable_projection_pushdown_after_relation_cse,
5183 };
5184 let features = features_extracted.into_values(scx.catalog);
5185 let availability_zones = if availability_zones.is_empty() {
5186 None
5187 } else {
5188 Some(availability_zones)
5189 };
5190 let (introspection_interval, introspection_debugging) =
5191 unplan_compute_replica_config(compute);
5192 let replication_factor = match &schedule {
5195 ClusterScheduleOptionValue::Manual => Some(replication_factor),
5196 ClusterScheduleOptionValue::Refresh { .. } => {
5197 assert!(
5198 replication_factor <= 1,
5199 "replication factor, {replication_factor:?}, must be <= 1"
5200 );
5201 None
5202 }
5203 };
5204 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5205 let options_extracted = ClusterOptionExtracted {
5206 seen: Default::default(),
5208 availability_zones,
5209 disk: None,
5210 introspection_debugging: Some(introspection_debugging),
5211 introspection_interval,
5212 managed: Some(true),
5213 replicas: None,
5214 replication_factor,
5215 size: Some(size),
5216 schedule: Some(schedule),
5217 workload_class,
5218 };
5219 let options = options_extracted.into_values(scx.catalog);
5220 let name = Ident::new_unchecked(name);
5221 Ok(CreateClusterStatement {
5222 name,
5223 options,
5224 features,
5225 })
5226 }
5227 CreateClusterVariant::Unmanaged(_) => {
5228 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5229 }
5230 }
5231}
5232
5233generate_extracted_config!(
5234 ReplicaOption,
5235 (AvailabilityZone, String),
5236 (BilledAs, String),
5237 (ComputeAddresses, Vec<String>),
5238 (ComputectlAddresses, Vec<String>),
5239 (Disk, bool),
5240 (Internal, bool, Default(false)),
5241 (IntrospectionDebugging, bool, Default(false)),
5242 (IntrospectionInterval, OptionalDuration),
5243 (Size, String),
5244 (StorageAddresses, Vec<String>),
5245 (StoragectlAddresses, Vec<String>),
5246 (Workers, u16)
5247);
5248
5249fn plan_replica_config(
5250 scx: &StatementContext,
5251 options: Vec<ReplicaOption<Aug>>,
5252) -> Result<ReplicaConfig, PlanError> {
5253 let ReplicaOptionExtracted {
5254 availability_zone,
5255 billed_as,
5256 computectl_addresses,
5257 disk,
5258 internal,
5259 introspection_debugging,
5260 introspection_interval,
5261 size,
5262 storagectl_addresses,
5263 ..
5264 }: ReplicaOptionExtracted = options.try_into()?;
5265
5266 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5267
5268 match (
5269 size,
5270 availability_zone,
5271 billed_as,
5272 storagectl_addresses,
5273 computectl_addresses,
5274 ) {
5275 (None, _, None, None, None) => {
5277 sql_bail!("SIZE option must be specified");
5280 }
5281 (Some(size), availability_zone, billed_as, None, None) => {
5282 if disk.is_some() {
5283 if scx.catalog.is_cluster_size_cc(&size) {
5287 sql_bail!(
5288 "DISK option not supported for modern cluster sizes because disk is always enabled"
5289 );
5290 }
5291
5292 scx.catalog
5293 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5294 }
5295
5296 Ok(ReplicaConfig::Orchestrated {
5297 size,
5298 availability_zone,
5299 compute,
5300 billed_as,
5301 internal,
5302 })
5303 }
5304
5305 (None, None, None, storagectl_addresses, computectl_addresses) => {
5306 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5307
5308 let Some(storagectl_addrs) = storagectl_addresses else {
5312 sql_bail!("missing STORAGECTL ADDRESSES option");
5313 };
5314 let Some(computectl_addrs) = computectl_addresses else {
5315 sql_bail!("missing COMPUTECTL ADDRESSES option");
5316 };
5317
5318 if storagectl_addrs.len() != computectl_addrs.len() {
5319 sql_bail!(
5320 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5321 );
5322 }
5323
5324 if disk.is_some() {
5325 sql_bail!("DISK can't be specified for unorchestrated clusters");
5326 }
5327
5328 Ok(ReplicaConfig::Unorchestrated {
5329 storagectl_addrs,
5330 computectl_addrs,
5331 compute,
5332 })
5333 }
5334 _ => {
5335 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5338 }
5339 }
5340}
5341
5342fn plan_compute_replica_config(
5346 introspection_interval: Option<OptionalDuration>,
5347 introspection_debugging: bool,
5348) -> Result<ComputeReplicaConfig, PlanError> {
5349 let introspection_interval = introspection_interval
5350 .map(|OptionalDuration(i)| i)
5351 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5352 let introspection = match introspection_interval {
5353 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5354 interval,
5355 debugging: introspection_debugging,
5356 }),
5357 None if introspection_debugging => {
5358 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5359 }
5360 None => None,
5361 };
5362 let compute = ComputeReplicaConfig { introspection };
5363 Ok(compute)
5364}
5365
5366fn unplan_compute_replica_config(
5370 compute_replica_config: ComputeReplicaConfig,
5371) -> (Option<OptionalDuration>, bool) {
5372 match compute_replica_config.introspection {
5373 Some(ComputeReplicaIntrospectionConfig {
5374 debugging,
5375 interval,
5376 }) => (Some(OptionalDuration(Some(interval))), debugging),
5377 None => (Some(OptionalDuration(None)), false),
5378 }
5379}
5380
5381fn plan_cluster_schedule(
5385 schedule: ClusterScheduleOptionValue,
5386) -> Result<ClusterSchedule, PlanError> {
5387 Ok(match schedule {
5388 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5389 ClusterScheduleOptionValue::Refresh {
5391 hydration_time_estimate: None,
5392 } => ClusterSchedule::Refresh {
5393 hydration_time_estimate: Duration::from_millis(0),
5394 },
5395 ClusterScheduleOptionValue::Refresh {
5397 hydration_time_estimate: Some(interval_value),
5398 } => {
5399 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5400 if interval.as_microseconds() < 0 {
5401 sql_bail!(
5402 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5403 interval
5404 );
5405 }
5406 if interval.months != 0 {
5407 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5411 }
5412 let duration = interval.duration()?;
5413 if u64::try_from(duration.as_millis()).is_err()
5414 || Interval::from_duration(&duration).is_err()
5415 {
5416 sql_bail!("HYDRATION TIME ESTIMATE too large");
5417 }
5418 ClusterSchedule::Refresh {
5419 hydration_time_estimate: duration,
5420 }
5421 }
5422 })
5423}
5424
5425fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5429 match schedule {
5430 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5431 ClusterSchedule::Refresh {
5432 hydration_time_estimate,
5433 } => {
5434 let interval = Interval::from_duration(&hydration_time_estimate)
5435 .expect("planning ensured that this is convertible back to Interval");
5436 let interval_value = literal::unplan_interval(&interval);
5437 ClusterScheduleOptionValue::Refresh {
5438 hydration_time_estimate: Some(interval_value),
5439 }
5440 }
5441 }
5442}
5443
5444pub fn describe_create_cluster_replica(
5445 _: &StatementContext,
5446 _: CreateClusterReplicaStatement<Aug>,
5447) -> Result<StatementDesc, PlanError> {
5448 Ok(StatementDesc::new(None))
5449}
5450
5451pub fn plan_create_cluster_replica(
5452 scx: &StatementContext,
5453 CreateClusterReplicaStatement {
5454 definition: ReplicaDefinition { name, options },
5455 of_cluster,
5456 }: CreateClusterReplicaStatement<Aug>,
5457) -> Result<Plan, PlanError> {
5458 let cluster = scx
5459 .catalog
5460 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5461 let current_replica_count = cluster.replica_ids().iter().count();
5462 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5463 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5464 return Err(PlanError::CreateReplicaFailStorageObjects {
5465 current_replica_count,
5466 internal_replica_count,
5467 hypothetical_replica_count: current_replica_count + 1,
5468 });
5469 }
5470
5471 let config = plan_replica_config(scx, options)?;
5472
5473 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5474 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5475 return Err(PlanError::MangedReplicaName(name.into_string()));
5476 }
5477 } else {
5478 ensure_cluster_is_not_managed(scx, cluster.id())?;
5479 }
5480
5481 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5482 name: normalize::ident(name),
5483 cluster_id: cluster.id(),
5484 config,
5485 }))
5486}
5487
5488pub fn describe_create_secret(
5489 _: &StatementContext,
5490 _: CreateSecretStatement<Aug>,
5491) -> Result<StatementDesc, PlanError> {
5492 Ok(StatementDesc::new(None))
5493}
5494
5495pub fn plan_create_secret(
5496 scx: &StatementContext,
5497 stmt: CreateSecretStatement<Aug>,
5498) -> Result<Plan, PlanError> {
5499 let CreateSecretStatement {
5500 name,
5501 if_not_exists,
5502 value,
5503 } = &stmt;
5504
5505 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5506 let mut create_sql_statement = stmt.clone();
5507 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5508 let create_sql =
5509 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5510 let secret_as = query::plan_secret_as(scx, value.clone())?;
5511
5512 let secret = Secret {
5513 create_sql,
5514 secret_as,
5515 };
5516
5517 Ok(Plan::CreateSecret(CreateSecretPlan {
5518 name,
5519 secret,
5520 if_not_exists: *if_not_exists,
5521 }))
5522}
5523
5524pub fn describe_create_connection(
5525 _: &StatementContext,
5526 _: CreateConnectionStatement<Aug>,
5527) -> Result<StatementDesc, PlanError> {
5528 Ok(StatementDesc::new(None))
5529}
5530
5531generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5532
5533pub fn plan_create_connection(
5534 scx: &StatementContext,
5535 mut stmt: CreateConnectionStatement<Aug>,
5536) -> Result<Plan, PlanError> {
5537 let CreateConnectionStatement {
5538 name,
5539 connection_type,
5540 values,
5541 if_not_exists,
5542 with_options,
5543 } = stmt.clone();
5544 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5545 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5546 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5547
5548 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5549 if options.validate.is_some() {
5550 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5551 }
5552 let validate = match options.validate {
5553 Some(val) => val,
5554 None => {
5555 scx.catalog
5556 .system_vars()
5557 .enable_default_connection_validation()
5558 && details.to_connection().validate_by_default()
5559 }
5560 };
5561
5562 let full_name = scx.catalog.resolve_full_name(&name);
5564 let partial_name = PartialItemName::from(full_name.clone());
5565 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5566 return Err(PlanError::ItemAlreadyExists {
5567 name: full_name.to_string(),
5568 item_type: item.item_type(),
5569 });
5570 }
5571
5572 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5575 stmt.values.retain(|v| {
5576 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5577 });
5578 stmt.values.push(ConnectionOption {
5579 name: ConnectionOptionName::PublicKey1,
5580 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5581 });
5582 stmt.values.push(ConnectionOption {
5583 name: ConnectionOptionName::PublicKey2,
5584 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5585 });
5586 }
5587 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5588
5589 let plan = CreateConnectionPlan {
5590 name,
5591 if_not_exists,
5592 connection: crate::plan::Connection {
5593 create_sql,
5594 details,
5595 },
5596 validate,
5597 };
5598 Ok(Plan::CreateConnection(plan))
5599}
5600
5601fn plan_drop_database(
5602 scx: &StatementContext,
5603 if_exists: bool,
5604 name: &UnresolvedDatabaseName,
5605 cascade: bool,
5606) -> Result<Option<DatabaseId>, PlanError> {
5607 Ok(match resolve_database(scx, name, if_exists)? {
5608 Some(database) => {
5609 if !cascade && database.has_schemas() {
5610 sql_bail!(
5611 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5612 name,
5613 );
5614 }
5615 Some(database.id())
5616 }
5617 None => None,
5618 })
5619}
5620
5621pub fn describe_drop_objects(
5622 _: &StatementContext,
5623 _: DropObjectsStatement,
5624) -> Result<StatementDesc, PlanError> {
5625 Ok(StatementDesc::new(None))
5626}
5627
5628pub fn plan_drop_objects(
5629 scx: &mut StatementContext,
5630 DropObjectsStatement {
5631 object_type,
5632 if_exists,
5633 names,
5634 cascade,
5635 }: DropObjectsStatement,
5636) -> Result<Plan, PlanError> {
5637 assert_ne!(
5638 object_type,
5639 mz_sql_parser::ast::ObjectType::Func,
5640 "rejected in parser"
5641 );
5642 let object_type = object_type.into();
5643
5644 let mut referenced_ids = Vec::new();
5645 for name in names {
5646 let id = match &name {
5647 UnresolvedObjectName::Cluster(name) => {
5648 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5649 }
5650 UnresolvedObjectName::ClusterReplica(name) => {
5651 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5652 }
5653 UnresolvedObjectName::Database(name) => {
5654 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5655 }
5656 UnresolvedObjectName::Schema(name) => {
5657 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5658 }
5659 UnresolvedObjectName::Role(name) => {
5660 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5661 }
5662 UnresolvedObjectName::Item(name) => {
5663 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5664 .map(ObjectId::Item)
5665 }
5666 UnresolvedObjectName::NetworkPolicy(name) => {
5667 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5668 }
5669 };
5670 match id {
5671 Some(id) => referenced_ids.push(id),
5672 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5673 name: name.to_ast_string_simple(),
5674 object_type,
5675 }),
5676 }
5677 }
5678 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5679
5680 Ok(Plan::DropObjects(DropObjectsPlan {
5681 referenced_ids,
5682 drop_ids,
5683 object_type,
5684 }))
5685}
5686
5687fn plan_drop_schema(
5688 scx: &StatementContext,
5689 if_exists: bool,
5690 name: &UnresolvedSchemaName,
5691 cascade: bool,
5692) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5693 let normalized = normalize::unresolved_schema_name(name.clone())?;
5697 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5698 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5699 }
5700
5701 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5702 Some((database_spec, schema_spec)) => {
5703 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5704 sql_bail!(
5705 "cannot drop schema {name} because it is required by the database system",
5706 );
5707 }
5708 if let SchemaSpecifier::Temporary = schema_spec {
5709 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5710 }
5711 let schema = scx.get_schema(&database_spec, &schema_spec);
5712 if !cascade && schema.has_items() {
5713 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5714 sql_bail!(
5715 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5716 full_schema_name
5717 );
5718 }
5719 Some((database_spec, schema_spec))
5720 }
5721 None => None,
5722 })
5723}
5724
5725fn plan_drop_role(
5726 scx: &StatementContext,
5727 if_exists: bool,
5728 name: &Ident,
5729) -> Result<Option<RoleId>, PlanError> {
5730 match scx.catalog.resolve_role(name.as_str()) {
5731 Ok(role) => {
5732 let id = role.id();
5733 if &id == scx.catalog.active_role_id() {
5734 sql_bail!("current role cannot be dropped");
5735 }
5736 for role in scx.catalog.get_roles() {
5737 for (member_id, grantor_id) in role.membership() {
5738 if &id == grantor_id {
5739 let member_role = scx.catalog.get_role(member_id);
5740 sql_bail!(
5741 "cannot drop role {}: still depended up by membership of role {} in role {}",
5742 name.as_str(),
5743 role.name(),
5744 member_role.name()
5745 );
5746 }
5747 }
5748 }
5749 Ok(Some(role.id()))
5750 }
5751 Err(_) if if_exists => Ok(None),
5752 Err(e) => Err(e.into()),
5753 }
5754}
5755
5756fn plan_drop_cluster(
5757 scx: &StatementContext,
5758 if_exists: bool,
5759 name: &Ident,
5760 cascade: bool,
5761) -> Result<Option<ClusterId>, PlanError> {
5762 Ok(match resolve_cluster(scx, name, if_exists)? {
5763 Some(cluster) => {
5764 if !cascade && !cluster.bound_objects().is_empty() {
5765 return Err(PlanError::DependentObjectsStillExist {
5766 object_type: "cluster".to_string(),
5767 object_name: cluster.name().to_string(),
5768 dependents: Vec::new(),
5769 });
5770 }
5771 Some(cluster.id())
5772 }
5773 None => None,
5774 })
5775}
5776
5777fn plan_drop_network_policy(
5778 scx: &StatementContext,
5779 if_exists: bool,
5780 name: &Ident,
5781) -> Result<Option<NetworkPolicyId>, PlanError> {
5782 match scx.catalog.resolve_network_policy(name.as_str()) {
5783 Ok(policy) => {
5784 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5787 Err(PlanError::NetworkPolicyInUse)
5788 } else {
5789 Ok(Some(policy.id()))
5790 }
5791 }
5792 Err(_) if if_exists => Ok(None),
5793 Err(e) => Err(e.into()),
5794 }
5795}
5796
5797fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5800 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5802 false
5803 } else {
5804 cluster.bound_objects().iter().any(|id| {
5806 let item = scx.catalog.get_item(id);
5807 matches!(
5808 item.item_type(),
5809 CatalogItemType::Sink | CatalogItemType::Source
5810 )
5811 })
5812 }
5813}
5814
5815fn plan_drop_cluster_replica(
5816 scx: &StatementContext,
5817 if_exists: bool,
5818 name: &QualifiedReplica,
5819) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5820 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5821 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5822}
5823
5824fn plan_drop_item(
5826 scx: &StatementContext,
5827 object_type: ObjectType,
5828 if_exists: bool,
5829 name: UnresolvedItemName,
5830 cascade: bool,
5831) -> Result<Option<CatalogItemId>, PlanError> {
5832 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5833 Ok(r) => r,
5834 Err(PlanError::MismatchedObjectType {
5836 name,
5837 is_type: ObjectType::MaterializedView,
5838 expected_type: ObjectType::View,
5839 }) => {
5840 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5841 }
5842 e => e?,
5843 };
5844
5845 Ok(match resolved {
5846 Some(catalog_item) => {
5847 if catalog_item.id().is_system() {
5848 sql_bail!(
5849 "cannot drop {} {} because it is required by the database system",
5850 catalog_item.item_type(),
5851 scx.catalog.minimal_qualification(catalog_item.name()),
5852 );
5853 }
5854
5855 if !cascade {
5856 for id in catalog_item.used_by() {
5857 let dep = scx.catalog.get_item(id);
5858 if dependency_prevents_drop(object_type, dep) {
5859 return Err(PlanError::DependentObjectsStillExist {
5860 object_type: catalog_item.item_type().to_string(),
5861 object_name: scx
5862 .catalog
5863 .minimal_qualification(catalog_item.name())
5864 .to_string(),
5865 dependents: vec![(
5866 dep.item_type().to_string(),
5867 scx.catalog.minimal_qualification(dep.name()).to_string(),
5868 )],
5869 });
5870 }
5871 }
5872 }
5875 Some(catalog_item.id())
5876 }
5877 None => None,
5878 })
5879}
5880
5881fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5883 match object_type {
5884 ObjectType::Type => true,
5885 ObjectType::Table
5886 | ObjectType::View
5887 | ObjectType::MaterializedView
5888 | ObjectType::Source
5889 | ObjectType::Sink
5890 | ObjectType::Index
5891 | ObjectType::Role
5892 | ObjectType::Cluster
5893 | ObjectType::ClusterReplica
5894 | ObjectType::Secret
5895 | ObjectType::Connection
5896 | ObjectType::Database
5897 | ObjectType::Schema
5898 | ObjectType::Func
5899 | ObjectType::ContinualTask
5900 | ObjectType::NetworkPolicy => match dep.item_type() {
5901 CatalogItemType::Func
5902 | CatalogItemType::Table
5903 | CatalogItemType::Source
5904 | CatalogItemType::View
5905 | CatalogItemType::MaterializedView
5906 | CatalogItemType::Sink
5907 | CatalogItemType::Type
5908 | CatalogItemType::Secret
5909 | CatalogItemType::Connection
5910 | CatalogItemType::ContinualTask => true,
5911 CatalogItemType::Index => false,
5912 },
5913 }
5914}
5915
5916pub fn describe_alter_index_options(
5917 _: &StatementContext,
5918 _: AlterIndexStatement<Aug>,
5919) -> Result<StatementDesc, PlanError> {
5920 Ok(StatementDesc::new(None))
5921}
5922
5923pub fn describe_drop_owned(
5924 _: &StatementContext,
5925 _: DropOwnedStatement<Aug>,
5926) -> Result<StatementDesc, PlanError> {
5927 Ok(StatementDesc::new(None))
5928}
5929
5930pub fn plan_drop_owned(
5931 scx: &StatementContext,
5932 drop: DropOwnedStatement<Aug>,
5933) -> Result<Plan, PlanError> {
5934 let cascade = drop.cascade();
5935 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5936 let mut drop_ids = Vec::new();
5937 let mut privilege_revokes = Vec::new();
5938 let mut default_privilege_revokes = Vec::new();
5939
5940 fn update_privilege_revokes(
5941 object_id: SystemObjectId,
5942 privileges: &PrivilegeMap,
5943 role_ids: &BTreeSet<RoleId>,
5944 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5945 ) {
5946 privilege_revokes.extend(iter::zip(
5947 iter::repeat(object_id),
5948 privileges
5949 .all_values()
5950 .filter(|privilege| role_ids.contains(&privilege.grantee))
5951 .cloned(),
5952 ));
5953 }
5954
5955 for replica in scx.catalog.get_cluster_replicas() {
5957 if role_ids.contains(&replica.owner_id()) {
5958 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5959 }
5960 }
5961
5962 for cluster in scx.catalog.get_clusters() {
5964 if role_ids.contains(&cluster.owner_id()) {
5965 if !cascade {
5967 let non_owned_bound_objects: Vec<_> = cluster
5968 .bound_objects()
5969 .into_iter()
5970 .map(|item_id| scx.catalog.get_item(item_id))
5971 .filter(|item| !role_ids.contains(&item.owner_id()))
5972 .collect();
5973 if !non_owned_bound_objects.is_empty() {
5974 let names: Vec<_> = non_owned_bound_objects
5975 .into_iter()
5976 .map(|item| {
5977 (
5978 item.item_type().to_string(),
5979 scx.catalog.resolve_full_name(item.name()).to_string(),
5980 )
5981 })
5982 .collect();
5983 return Err(PlanError::DependentObjectsStillExist {
5984 object_type: "cluster".to_string(),
5985 object_name: cluster.name().to_string(),
5986 dependents: names,
5987 });
5988 }
5989 }
5990 drop_ids.push(cluster.id().into());
5991 }
5992 update_privilege_revokes(
5993 SystemObjectId::Object(cluster.id().into()),
5994 cluster.privileges(),
5995 &role_ids,
5996 &mut privilege_revokes,
5997 );
5998 }
5999
6000 for item in scx.catalog.get_items() {
6002 if role_ids.contains(&item.owner_id()) {
6003 if !cascade {
6004 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
6006 let non_owned_dependencies: Vec<_> = used_by
6007 .into_iter()
6008 .map(|item_id| scx.catalog.get_item(item_id))
6009 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
6010 .filter(|item| !role_ids.contains(&item.owner_id()))
6011 .collect();
6012 if !non_owned_dependencies.is_empty() {
6013 let names: Vec<_> = non_owned_dependencies
6014 .into_iter()
6015 .map(|item| {
6016 let item_typ = item.item_type().to_string();
6017 let item_name =
6018 scx.catalog.resolve_full_name(item.name()).to_string();
6019 (item_typ, item_name)
6020 })
6021 .collect();
6022 Err(PlanError::DependentObjectsStillExist {
6023 object_type: item.item_type().to_string(),
6024 object_name: scx
6025 .catalog
6026 .resolve_full_name(item.name())
6027 .to_string()
6028 .to_string(),
6029 dependents: names,
6030 })
6031 } else {
6032 Ok(())
6033 }
6034 };
6035
6036 if let Some(id) = item.progress_id() {
6039 let progress_item = scx.catalog.get_item(&id);
6040 check_if_dependents_exist(progress_item.used_by())?;
6041 }
6042 check_if_dependents_exist(item.used_by())?;
6043 }
6044 drop_ids.push(item.id().into());
6045 }
6046 update_privilege_revokes(
6047 SystemObjectId::Object(item.id().into()),
6048 item.privileges(),
6049 &role_ids,
6050 &mut privilege_revokes,
6051 );
6052 }
6053
6054 for schema in scx.catalog.get_schemas() {
6056 if !schema.id().is_temporary() {
6057 if role_ids.contains(&schema.owner_id()) {
6058 if !cascade {
6059 let non_owned_dependencies: Vec<_> = schema
6060 .item_ids()
6061 .map(|item_id| scx.catalog.get_item(&item_id))
6062 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
6063 .filter(|item| !role_ids.contains(&item.owner_id()))
6064 .collect();
6065 if !non_owned_dependencies.is_empty() {
6066 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
6067 sql_bail!(
6068 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
6069 full_schema_name.to_string().quoted()
6070 );
6071 }
6072 }
6073 drop_ids.push((*schema.database(), *schema.id()).into())
6074 }
6075 update_privilege_revokes(
6076 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
6077 schema.privileges(),
6078 &role_ids,
6079 &mut privilege_revokes,
6080 );
6081 }
6082 }
6083
6084 for database in scx.catalog.get_databases() {
6086 if role_ids.contains(&database.owner_id()) {
6087 if !cascade {
6088 let non_owned_schemas: Vec<_> = database
6089 .schemas()
6090 .into_iter()
6091 .filter(|schema| !role_ids.contains(&schema.owner_id()))
6092 .collect();
6093 if !non_owned_schemas.is_empty() {
6094 sql_bail!(
6095 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
6096 database.name().quoted(),
6097 );
6098 }
6099 }
6100 drop_ids.push(database.id().into());
6101 }
6102 update_privilege_revokes(
6103 SystemObjectId::Object(database.id().into()),
6104 database.privileges(),
6105 &role_ids,
6106 &mut privilege_revokes,
6107 );
6108 }
6109
6110 for network_policy in scx.catalog.get_network_policies() {
6112 if role_ids.contains(&network_policy.owner_id()) {
6113 drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
6114 }
6115 update_privilege_revokes(
6116 SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
6117 network_policy.privileges(),
6118 &role_ids,
6119 &mut privilege_revokes,
6120 );
6121 }
6122
6123 update_privilege_revokes(
6125 SystemObjectId::System,
6126 scx.catalog.get_system_privileges(),
6127 &role_ids,
6128 &mut privilege_revokes,
6129 );
6130
6131 for (default_privilege_object, default_privilege_acl_items) in
6132 scx.catalog.get_default_privileges()
6133 {
6134 for default_privilege_acl_item in default_privilege_acl_items {
6135 if role_ids.contains(&default_privilege_object.role_id)
6136 || role_ids.contains(&default_privilege_acl_item.grantee)
6137 {
6138 default_privilege_revokes.push((
6139 default_privilege_object.clone(),
6140 default_privilege_acl_item.clone(),
6141 ));
6142 }
6143 }
6144 }
6145
6146 let drop_ids = scx.catalog.object_dependents(&drop_ids);
6147
6148 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
6149 if !system_ids.is_empty() {
6150 let mut owners = system_ids
6151 .into_iter()
6152 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
6153 .collect::<BTreeSet<_>>()
6154 .into_iter()
6155 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
6156 sql_bail!(
6157 "cannot drop objects owned by role {} because they are required by the database system",
6158 owners.join(", "),
6159 );
6160 }
6161
6162 Ok(Plan::DropOwned(DropOwnedPlan {
6163 role_ids: role_ids.into_iter().collect(),
6164 drop_ids,
6165 privilege_revokes,
6166 default_privilege_revokes,
6167 }))
6168}
6169
6170fn plan_retain_history_option(
6171 scx: &StatementContext,
6172 retain_history: Option<OptionalDuration>,
6173) -> Result<Option<CompactionWindow>, PlanError> {
6174 if let Some(OptionalDuration(lcw)) = retain_history {
6175 Ok(Some(plan_retain_history(scx, lcw)?))
6176 } else {
6177 Ok(None)
6178 }
6179}
6180
6181fn plan_retain_history(
6187 scx: &StatementContext,
6188 lcw: Option<Duration>,
6189) -> Result<CompactionWindow, PlanError> {
6190 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6191 match lcw {
6192 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6197 option_name: "RETAIN HISTORY".to_string(),
6198 err: Box::new(PlanError::Unstructured(
6199 "internal error: unexpectedly zero".to_string(),
6200 )),
6201 }),
6202 Some(duration) => {
6203 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6206 && scx
6207 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6208 .is_err()
6209 {
6210 return Err(PlanError::RetainHistoryLow {
6211 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6212 });
6213 }
6214 Ok(duration.try_into()?)
6215 }
6216 None => {
6219 if scx
6220 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6221 .is_err()
6222 {
6223 Err(PlanError::RetainHistoryRequired)
6224 } else {
6225 Ok(CompactionWindow::DisableCompaction)
6226 }
6227 }
6228 }
6229}
6230
6231generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6232
6233fn plan_index_options(
6234 scx: &StatementContext,
6235 with_opts: Vec<IndexOption<Aug>>,
6236) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6237 if !with_opts.is_empty() {
6238 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6240 }
6241
6242 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6243
6244 let mut out = Vec::with_capacity(1);
6245 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6246 out.push(crate::plan::IndexOption::RetainHistory(cw));
6247 }
6248 Ok(out)
6249}
6250
6251generate_extracted_config!(
6252 TableOption,
6253 (PartitionBy, Vec<Ident>),
6254 (RetainHistory, OptionalDuration),
6255 (RedactedTest, String)
6256);
6257
6258fn plan_table_options(
6259 scx: &StatementContext,
6260 desc: &RelationDesc,
6261 with_opts: Vec<TableOption<Aug>>,
6262) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6263 let TableOptionExtracted {
6264 partition_by,
6265 retain_history,
6266 redacted_test,
6267 ..
6268 }: TableOptionExtracted = with_opts.try_into()?;
6269
6270 if let Some(partition_by) = partition_by {
6271 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6272 check_partition_by(desc, partition_by)?;
6273 }
6274
6275 if redacted_test.is_some() {
6276 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6277 }
6278
6279 let mut out = Vec::with_capacity(1);
6280 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6281 out.push(crate::plan::TableOption::RetainHistory(cw));
6282 }
6283 Ok(out)
6284}
6285
6286pub fn plan_alter_index_options(
6287 scx: &mut StatementContext,
6288 AlterIndexStatement {
6289 index_name,
6290 if_exists,
6291 action,
6292 }: AlterIndexStatement<Aug>,
6293) -> Result<Plan, PlanError> {
6294 let object_type = ObjectType::Index;
6295 match action {
6296 AlterIndexAction::ResetOptions(options) => {
6297 let mut options = options.into_iter();
6298 if let Some(opt) = options.next() {
6299 match opt {
6300 IndexOptionName::RetainHistory => {
6301 if options.next().is_some() {
6302 sql_bail!("RETAIN HISTORY must be only option");
6303 }
6304 return alter_retain_history(
6305 scx,
6306 object_type,
6307 if_exists,
6308 UnresolvedObjectName::Item(index_name),
6309 None,
6310 );
6311 }
6312 }
6313 }
6314 sql_bail!("expected option");
6315 }
6316 AlterIndexAction::SetOptions(options) => {
6317 let mut options = options.into_iter();
6318 if let Some(opt) = options.next() {
6319 match opt.name {
6320 IndexOptionName::RetainHistory => {
6321 if options.next().is_some() {
6322 sql_bail!("RETAIN HISTORY must be only option");
6323 }
6324 return alter_retain_history(
6325 scx,
6326 object_type,
6327 if_exists,
6328 UnresolvedObjectName::Item(index_name),
6329 opt.value,
6330 );
6331 }
6332 }
6333 }
6334 sql_bail!("expected option");
6335 }
6336 }
6337}
6338
6339pub fn describe_alter_cluster_set_options(
6340 _: &StatementContext,
6341 _: AlterClusterStatement<Aug>,
6342) -> Result<StatementDesc, PlanError> {
6343 Ok(StatementDesc::new(None))
6344}
6345
6346pub fn plan_alter_cluster(
6347 scx: &mut StatementContext,
6348 AlterClusterStatement {
6349 name,
6350 action,
6351 if_exists,
6352 }: AlterClusterStatement<Aug>,
6353) -> Result<Plan, PlanError> {
6354 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6355 Some(entry) => entry,
6356 None => {
6357 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6358 name: name.to_ast_string_simple(),
6359 object_type: ObjectType::Cluster,
6360 });
6361
6362 return Ok(Plan::AlterNoop(AlterNoopPlan {
6363 object_type: ObjectType::Cluster,
6364 }));
6365 }
6366 };
6367
6368 let mut options: PlanClusterOption = Default::default();
6369 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6370
6371 match action {
6372 AlterClusterAction::SetOptions {
6373 options: set_options,
6374 with_options,
6375 } => {
6376 let ClusterOptionExtracted {
6377 availability_zones,
6378 introspection_debugging,
6379 introspection_interval,
6380 managed,
6381 replicas: replica_defs,
6382 replication_factor,
6383 seen: _,
6384 size,
6385 disk,
6386 schedule,
6387 workload_class,
6388 }: ClusterOptionExtracted = set_options.try_into()?;
6389
6390 if !scx.catalog.active_role_id().is_system() {
6391 if workload_class.is_some() {
6392 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6393 }
6394 }
6395
6396 match managed.unwrap_or_else(|| cluster.is_managed()) {
6397 true => {
6398 let alter_strategy_extracted =
6399 ClusterAlterOptionExtracted::try_from(with_options)?;
6400 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6401
6402 match alter_strategy {
6403 AlterClusterPlanStrategy::None => {}
6404 _ => {
6405 scx.require_feature_flag(
6406 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6407 )?;
6408 }
6409 }
6410
6411 if replica_defs.is_some() {
6412 sql_bail!("REPLICAS not supported for managed clusters");
6413 }
6414 if schedule.is_some()
6415 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6416 {
6417 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6418 }
6419
6420 if let Some(replication_factor) = replication_factor {
6421 if schedule.is_some()
6422 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6423 {
6424 sql_bail!(
6425 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6426 );
6427 }
6428 if let Some(current_schedule) = cluster.schedule() {
6429 if !matches!(current_schedule, ClusterSchedule::Manual) {
6430 sql_bail!(
6431 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6432 );
6433 }
6434 }
6435
6436 let internal_replica_count =
6437 cluster.replicas().iter().filter(|r| r.internal()).count();
6438 let hypothetical_replica_count =
6439 internal_replica_count + usize::cast_from(replication_factor);
6440
6441 if contains_single_replica_objects(scx, cluster)
6444 && hypothetical_replica_count > 1
6445 {
6446 return Err(PlanError::CreateReplicaFailStorageObjects {
6447 current_replica_count: cluster.replica_ids().iter().count(),
6448 internal_replica_count,
6449 hypothetical_replica_count,
6450 });
6451 }
6452 } else if alter_strategy.is_some() {
6453 let internal_replica_count =
6457 cluster.replicas().iter().filter(|r| r.internal()).count();
6458 let hypothetical_replica_count = internal_replica_count * 2;
6459 if contains_single_replica_objects(scx, cluster) {
6460 return Err(PlanError::CreateReplicaFailStorageObjects {
6461 current_replica_count: cluster.replica_ids().iter().count(),
6462 internal_replica_count,
6463 hypothetical_replica_count,
6464 });
6465 }
6466 }
6467 }
6468 false => {
6469 if !alter_strategy.is_none() {
6470 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6471 }
6472 if availability_zones.is_some() {
6473 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6474 }
6475 if replication_factor.is_some() {
6476 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6477 }
6478 if introspection_debugging.is_some() {
6479 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6480 }
6481 if introspection_interval.is_some() {
6482 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6483 }
6484 if size.is_some() {
6485 sql_bail!("SIZE not supported for unmanaged clusters");
6486 }
6487 if disk.is_some() {
6488 sql_bail!("DISK not supported for unmanaged clusters");
6489 }
6490 if schedule.is_some()
6491 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6492 {
6493 sql_bail!(
6494 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6495 );
6496 }
6497 if let Some(current_schedule) = cluster.schedule() {
6498 if !matches!(current_schedule, ClusterSchedule::Manual)
6499 && schedule.is_none()
6500 {
6501 sql_bail!(
6502 "when switching a cluster to unmanaged, if the managed \
6503 cluster's SCHEDULE is anything other than MANUAL, you have to \
6504 explicitly set the SCHEDULE to MANUAL"
6505 );
6506 }
6507 }
6508 }
6509 }
6510
6511 let mut replicas = vec![];
6512 for ReplicaDefinition { name, options } in
6513 replica_defs.into_iter().flat_map(Vec::into_iter)
6514 {
6515 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6516 }
6517
6518 if let Some(managed) = managed {
6519 options.managed = AlterOptionParameter::Set(managed);
6520 }
6521 if let Some(replication_factor) = replication_factor {
6522 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6523 }
6524 if let Some(size) = &size {
6525 options.size = AlterOptionParameter::Set(size.clone());
6526 }
6527 if let Some(availability_zones) = availability_zones {
6528 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6529 }
6530 if let Some(introspection_debugging) = introspection_debugging {
6531 options.introspection_debugging =
6532 AlterOptionParameter::Set(introspection_debugging);
6533 }
6534 if let Some(introspection_interval) = introspection_interval {
6535 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6536 }
6537 if disk.is_some() {
6538 let size = size.as_deref().unwrap_or_else(|| {
6542 cluster.managed_size().expect("cluster known to be managed")
6543 });
6544 if scx.catalog.is_cluster_size_cc(size) {
6545 sql_bail!(
6546 "DISK option not supported for modern cluster sizes because disk is always enabled"
6547 );
6548 }
6549
6550 scx.catalog
6551 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6552 }
6553 if !replicas.is_empty() {
6554 options.replicas = AlterOptionParameter::Set(replicas);
6555 }
6556 if let Some(schedule) = schedule {
6557 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6558 }
6559 if let Some(workload_class) = workload_class {
6560 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6561 }
6562 }
6563 AlterClusterAction::ResetOptions(reset_options) => {
6564 use AlterOptionParameter::Reset;
6565 use ClusterOptionName::*;
6566
6567 if !scx.catalog.active_role_id().is_system() {
6568 if reset_options.contains(&WorkloadClass) {
6569 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6570 }
6571 }
6572
6573 for option in reset_options {
6574 match option {
6575 AvailabilityZones => options.availability_zones = Reset,
6576 Disk => scx
6577 .catalog
6578 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6579 IntrospectionInterval => options.introspection_interval = Reset,
6580 IntrospectionDebugging => options.introspection_debugging = Reset,
6581 Managed => options.managed = Reset,
6582 Replicas => options.replicas = Reset,
6583 ReplicationFactor => options.replication_factor = Reset,
6584 Size => options.size = Reset,
6585 Schedule => options.schedule = Reset,
6586 WorkloadClass => options.workload_class = Reset,
6587 }
6588 }
6589 }
6590 }
6591 Ok(Plan::AlterCluster(AlterClusterPlan {
6592 id: cluster.id(),
6593 name: cluster.name().to_string(),
6594 options,
6595 strategy: alter_strategy,
6596 }))
6597}
6598
6599pub fn describe_alter_set_cluster(
6600 _: &StatementContext,
6601 _: AlterSetClusterStatement<Aug>,
6602) -> Result<StatementDesc, PlanError> {
6603 Ok(StatementDesc::new(None))
6604}
6605
6606pub fn plan_alter_item_set_cluster(
6607 scx: &StatementContext,
6608 AlterSetClusterStatement {
6609 if_exists,
6610 set_cluster: in_cluster_name,
6611 name,
6612 object_type,
6613 }: AlterSetClusterStatement<Aug>,
6614) -> Result<Plan, PlanError> {
6615 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6616
6617 let object_type = object_type.into();
6618
6619 match object_type {
6621 ObjectType::MaterializedView => {}
6622 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6623 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6624 }
6625 ObjectType::Table
6626 | ObjectType::View
6627 | ObjectType::Type
6628 | ObjectType::Role
6629 | ObjectType::Cluster
6630 | ObjectType::ClusterReplica
6631 | ObjectType::Secret
6632 | ObjectType::Connection
6633 | ObjectType::Database
6634 | ObjectType::Schema
6635 | ObjectType::Func
6636 | ObjectType::ContinualTask
6637 | ObjectType::NetworkPolicy => {
6638 bail_never_supported!(
6639 format!("ALTER {object_type} SET CLUSTER"),
6640 "sql/alter-set-cluster/",
6641 format!("{object_type} has no associated cluster")
6642 )
6643 }
6644 }
6645
6646 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6647
6648 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6649 Some(entry) => {
6650 let current_cluster = entry.cluster_id();
6651 let Some(current_cluster) = current_cluster else {
6652 sql_bail!("No cluster associated with {name}");
6653 };
6654
6655 if current_cluster == in_cluster.id() {
6656 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6657 } else {
6658 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6659 id: entry.id(),
6660 set_cluster: in_cluster.id(),
6661 }))
6662 }
6663 }
6664 None => {
6665 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6666 name: name.to_ast_string_simple(),
6667 object_type,
6668 });
6669
6670 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6671 }
6672 }
6673}
6674
6675pub fn describe_alter_object_rename(
6676 _: &StatementContext,
6677 _: AlterObjectRenameStatement,
6678) -> Result<StatementDesc, PlanError> {
6679 Ok(StatementDesc::new(None))
6680}
6681
6682pub fn plan_alter_object_rename(
6683 scx: &mut StatementContext,
6684 AlterObjectRenameStatement {
6685 name,
6686 object_type,
6687 to_item_name,
6688 if_exists,
6689 }: AlterObjectRenameStatement,
6690) -> Result<Plan, PlanError> {
6691 let object_type = object_type.into();
6692 match (object_type, name) {
6693 (
6694 ObjectType::View
6695 | ObjectType::MaterializedView
6696 | ObjectType::Table
6697 | ObjectType::Source
6698 | ObjectType::Index
6699 | ObjectType::Sink
6700 | ObjectType::Secret
6701 | ObjectType::Connection,
6702 UnresolvedObjectName::Item(name),
6703 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6704 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6705 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6706 }
6707 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6708 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6709 }
6710 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6711 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6712 }
6713 (object_type, name) => {
6714 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6715 }
6716 }
6717}
6718
6719pub fn plan_alter_schema_rename(
6720 scx: &mut StatementContext,
6721 name: UnresolvedSchemaName,
6722 to_schema_name: Ident,
6723 if_exists: bool,
6724) -> Result<Plan, PlanError> {
6725 let normalized = normalize::unresolved_schema_name(name.clone())?;
6729 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6730 sql_bail!(
6731 "cannot rename schemas in the ambient database: {:?}",
6732 mz_repr::namespaces::MZ_TEMP_SCHEMA
6733 );
6734 }
6735
6736 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6737 let object_type = ObjectType::Schema;
6738 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6739 name: name.to_ast_string_simple(),
6740 object_type,
6741 });
6742 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6743 };
6744
6745 if scx
6747 .resolve_schema_in_database(&db_spec, &to_schema_name)
6748 .is_ok()
6749 {
6750 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6751 to_schema_name.clone().into_string(),
6752 )));
6753 }
6754
6755 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6757 if schema.id().is_system() {
6758 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6759 }
6760
6761 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6762 cur_schema_spec: (db_spec, schema_spec),
6763 new_schema_name: to_schema_name.into_string(),
6764 }))
6765}
6766
6767pub fn plan_alter_schema_swap<F>(
6768 scx: &mut StatementContext,
6769 name_a: UnresolvedSchemaName,
6770 name_b: Ident,
6771 gen_temp_suffix: F,
6772) -> Result<Plan, PlanError>
6773where
6774 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6775{
6776 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6780 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6781 {
6782 sql_bail!("cannot swap schemas that are in the ambient database");
6783 }
6784 let name_b_str = normalize::ident_ref(&name_b);
6786 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6787 sql_bail!("cannot swap schemas that are in the ambient database");
6788 }
6789
6790 let schema_a = scx.resolve_schema(name_a.clone())?;
6791
6792 let db_spec = schema_a.database().clone();
6793 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6794 sql_bail!("cannot swap schemas that are in the ambient database");
6795 };
6796 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6797
6798 if schema_a.id().is_system() || schema_b.id().is_system() {
6800 bail_never_supported!("swapping a system schema".to_string())
6801 }
6802
6803 let check = |temp_suffix: &str| {
6807 let mut temp_name = ident!("mz_schema_swap_");
6808 temp_name.append_lossy(temp_suffix);
6809 scx.resolve_schema_in_database(&db_spec, &temp_name)
6810 .is_err()
6811 };
6812 let temp_suffix = gen_temp_suffix(&check)?;
6813 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6814
6815 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6816 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6817 schema_a_name: schema_a.name().schema.to_string(),
6818 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6819 schema_b_name: schema_b.name().schema.to_string(),
6820 name_temp,
6821 }))
6822}
6823
6824pub fn plan_alter_item_rename(
6825 scx: &mut StatementContext,
6826 object_type: ObjectType,
6827 name: UnresolvedItemName,
6828 to_item_name: Ident,
6829 if_exists: bool,
6830) -> Result<Plan, PlanError> {
6831 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6832 Ok(r) => r,
6833 Err(PlanError::MismatchedObjectType {
6835 name,
6836 is_type: ObjectType::MaterializedView,
6837 expected_type: ObjectType::View,
6838 }) => {
6839 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6840 }
6841 e => e?,
6842 };
6843
6844 match resolved {
6845 Some(entry) => {
6846 let full_name = scx.catalog.resolve_full_name(entry.name());
6847 let item_type = entry.item_type();
6848
6849 let proposed_name = QualifiedItemName {
6850 qualifiers: entry.name().qualifiers.clone(),
6851 item: to_item_name.clone().into_string(),
6852 };
6853
6854 let conflicting_type_exists;
6858 let conflicting_item_exists;
6859 if item_type == CatalogItemType::Type {
6860 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6861 conflicting_item_exists = scx
6862 .catalog
6863 .get_item_by_name(&proposed_name)
6864 .map(|item| item.item_type().conflicts_with_type())
6865 .unwrap_or(false);
6866 } else {
6867 conflicting_type_exists = item_type.conflicts_with_type()
6868 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6869 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6870 };
6871 if conflicting_type_exists || conflicting_item_exists {
6872 sql_bail!("catalog item '{}' already exists", to_item_name);
6873 }
6874
6875 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6876 id: entry.id(),
6877 current_full_name: full_name,
6878 to_name: normalize::ident(to_item_name),
6879 object_type,
6880 }))
6881 }
6882 None => {
6883 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6884 name: name.to_ast_string_simple(),
6885 object_type,
6886 });
6887
6888 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6889 }
6890 }
6891}
6892
6893pub fn plan_alter_cluster_rename(
6894 scx: &mut StatementContext,
6895 object_type: ObjectType,
6896 name: Ident,
6897 to_name: Ident,
6898 if_exists: bool,
6899) -> Result<Plan, PlanError> {
6900 match resolve_cluster(scx, &name, if_exists)? {
6901 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6902 id: entry.id(),
6903 name: entry.name().to_string(),
6904 to_name: ident(to_name),
6905 })),
6906 None => {
6907 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6908 name: name.to_ast_string_simple(),
6909 object_type,
6910 });
6911
6912 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6913 }
6914 }
6915}
6916
6917pub fn plan_alter_cluster_swap<F>(
6918 scx: &mut StatementContext,
6919 name_a: Ident,
6920 name_b: Ident,
6921 gen_temp_suffix: F,
6922) -> Result<Plan, PlanError>
6923where
6924 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6925{
6926 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6927 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6928
6929 let check = |temp_suffix: &str| {
6930 let mut temp_name = ident!("mz_schema_swap_");
6931 temp_name.append_lossy(temp_suffix);
6932 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6933 Err(CatalogError::UnknownCluster(_)) => true,
6935 Ok(_) | Err(_) => false,
6937 }
6938 };
6939 let temp_suffix = gen_temp_suffix(&check)?;
6940 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6941
6942 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6943 id_a: cluster_a.id(),
6944 id_b: cluster_b.id(),
6945 name_a: name_a.into_string(),
6946 name_b: name_b.into_string(),
6947 name_temp,
6948 }))
6949}
6950
6951pub fn plan_alter_cluster_replica_rename(
6952 scx: &mut StatementContext,
6953 object_type: ObjectType,
6954 name: QualifiedReplica,
6955 to_item_name: Ident,
6956 if_exists: bool,
6957) -> Result<Plan, PlanError> {
6958 match resolve_cluster_replica(scx, &name, if_exists)? {
6959 Some((cluster, replica)) => {
6960 ensure_cluster_is_not_managed(scx, cluster.id())?;
6961 Ok(Plan::AlterClusterReplicaRename(
6962 AlterClusterReplicaRenamePlan {
6963 cluster_id: cluster.id(),
6964 replica_id: replica,
6965 name: QualifiedReplica {
6966 cluster: Ident::new(cluster.name())?,
6967 replica: name.replica,
6968 },
6969 to_name: normalize::ident(to_item_name),
6970 },
6971 ))
6972 }
6973 None => {
6974 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6975 name: name.to_ast_string_simple(),
6976 object_type,
6977 });
6978
6979 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6980 }
6981 }
6982}
6983
6984pub fn describe_alter_object_swap(
6985 _: &StatementContext,
6986 _: AlterObjectSwapStatement,
6987) -> Result<StatementDesc, PlanError> {
6988 Ok(StatementDesc::new(None))
6989}
6990
6991pub fn plan_alter_object_swap(
6992 scx: &mut StatementContext,
6993 stmt: AlterObjectSwapStatement,
6994) -> Result<Plan, PlanError> {
6995 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6996
6997 let AlterObjectSwapStatement {
6998 object_type,
6999 name_a,
7000 name_b,
7001 } = stmt;
7002 let object_type = object_type.into();
7003
7004 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
7006 let mut attempts = 0;
7007 let name_temp = loop {
7008 attempts += 1;
7009 if attempts > 10 {
7010 tracing::warn!("Unable to generate temp id for swapping");
7011 sql_bail!("unable to swap!");
7012 }
7013
7014 let short_id = mz_ore::id_gen::temp_id();
7016 if check_fn(&short_id) {
7017 break short_id;
7018 }
7019 };
7020
7021 Ok(name_temp)
7022 };
7023
7024 match (object_type, name_a, name_b) {
7025 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
7026 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
7027 }
7028 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
7029 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
7030 }
7031 (ObjectType::Schema | ObjectType::Cluster, _, _) => {
7032 unreachable!("parser ensures name type matches object type")
7033 }
7034 (
7035 ObjectType::Table
7036 | ObjectType::View
7037 | ObjectType::MaterializedView
7038 | ObjectType::Source
7039 | ObjectType::Sink
7040 | ObjectType::Index
7041 | ObjectType::Type
7042 | ObjectType::Role
7043 | ObjectType::ClusterReplica
7044 | ObjectType::Secret
7045 | ObjectType::Connection
7046 | ObjectType::Database
7047 | ObjectType::Func
7048 | ObjectType::ContinualTask
7049 | ObjectType::NetworkPolicy,
7050 _,
7051 _,
7052 ) => Err(PlanError::Unsupported {
7053 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
7054 discussion_no: None,
7055 }),
7056 }
7057}
7058
7059pub fn describe_alter_retain_history(
7060 _: &StatementContext,
7061 _: AlterRetainHistoryStatement<Aug>,
7062) -> Result<StatementDesc, PlanError> {
7063 Ok(StatementDesc::new(None))
7064}
7065
7066pub fn plan_alter_retain_history(
7067 scx: &StatementContext,
7068 AlterRetainHistoryStatement {
7069 object_type,
7070 if_exists,
7071 name,
7072 history,
7073 }: AlterRetainHistoryStatement<Aug>,
7074) -> Result<Plan, PlanError> {
7075 alter_retain_history(scx, object_type.into(), if_exists, name, history)
7076}
7077
7078fn alter_retain_history(
7079 scx: &StatementContext,
7080 object_type: ObjectType,
7081 if_exists: bool,
7082 name: UnresolvedObjectName,
7083 history: Option<WithOptionValue<Aug>>,
7084) -> Result<Plan, PlanError> {
7085 let name = match (object_type, name) {
7086 (
7087 ObjectType::View
7089 | ObjectType::MaterializedView
7090 | ObjectType::Table
7091 | ObjectType::Source
7092 | ObjectType::Index,
7093 UnresolvedObjectName::Item(name),
7094 ) => name,
7095 (object_type, _) => {
7096 bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
7097 }
7098 };
7099 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7100 Some(entry) => {
7101 let full_name = scx.catalog.resolve_full_name(entry.name());
7102 let item_type = entry.item_type();
7103
7104 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
7106 return Err(PlanError::AlterViewOnMaterializedView(
7107 full_name.to_string(),
7108 ));
7109 } else if object_type == ObjectType::View {
7110 sql_bail!("{object_type} does not support RETAIN HISTORY")
7111 } else if object_type != item_type {
7112 sql_bail!(
7113 "\"{}\" is a {} not a {}",
7114 full_name,
7115 entry.item_type(),
7116 format!("{object_type}").to_lowercase()
7117 )
7118 }
7119
7120 let (value, lcw) = match &history {
7122 Some(WithOptionValue::RetainHistoryFor(value)) => {
7123 let window = OptionalDuration::try_from_value(value.clone())?;
7124 (Some(value.clone()), window.0)
7125 }
7126 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
7128 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
7129 };
7130 let window = plan_retain_history(scx, lcw)?;
7131
7132 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
7133 id: entry.id(),
7134 value,
7135 window,
7136 object_type,
7137 }))
7138 }
7139 None => {
7140 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7141 name: name.to_ast_string_simple(),
7142 object_type,
7143 });
7144
7145 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7146 }
7147 }
7148}
7149
7150fn alter_source_timestamp_interval(
7151 scx: &StatementContext,
7152 if_exists: bool,
7153 source_name: UnresolvedItemName,
7154 value: Option<WithOptionValue<Aug>>,
7155) -> Result<Plan, PlanError> {
7156 let object_type = ObjectType::Source;
7157 match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
7158 Some(entry) => {
7159 let full_name = scx.catalog.resolve_full_name(entry.name());
7160 if entry.item_type() != CatalogItemType::Source {
7161 sql_bail!(
7162 "\"{}\" is a {} not a {}",
7163 full_name,
7164 entry.item_type(),
7165 format!("{object_type}").to_lowercase()
7166 )
7167 }
7168
7169 match value {
7170 Some(val) => {
7171 let val = match val {
7172 WithOptionValue::Value(v) => v,
7173 _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
7174 };
7175 let duration = Duration::try_from_value(val.clone())?;
7176
7177 let min = scx.catalog.system_vars().min_timestamp_interval();
7178 let max = scx.catalog.system_vars().max_timestamp_interval();
7179 if duration < min || duration > max {
7180 return Err(PlanError::InvalidTimestampInterval {
7181 min,
7182 max,
7183 requested: duration,
7184 });
7185 }
7186
7187 Ok(Plan::AlterSourceTimestampInterval(
7188 AlterSourceTimestampIntervalPlan {
7189 id: entry.id(),
7190 value: Some(val),
7191 interval: duration,
7192 },
7193 ))
7194 }
7195 None => {
7196 let interval = scx.catalog.system_vars().default_timestamp_interval();
7197 Ok(Plan::AlterSourceTimestampInterval(
7198 AlterSourceTimestampIntervalPlan {
7199 id: entry.id(),
7200 value: None,
7201 interval,
7202 },
7203 ))
7204 }
7205 }
7206 }
7207 None => {
7208 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7209 name: source_name.to_ast_string_simple(),
7210 object_type,
7211 });
7212
7213 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7214 }
7215 }
7216}
7217
7218pub fn describe_alter_secret_options(
7219 _: &StatementContext,
7220 _: AlterSecretStatement<Aug>,
7221) -> Result<StatementDesc, PlanError> {
7222 Ok(StatementDesc::new(None))
7223}
7224
7225pub fn plan_alter_secret(
7226 scx: &mut StatementContext,
7227 stmt: AlterSecretStatement<Aug>,
7228) -> Result<Plan, PlanError> {
7229 let AlterSecretStatement {
7230 name,
7231 if_exists,
7232 value,
7233 } = stmt;
7234 let object_type = ObjectType::Secret;
7235 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7236 Some(entry) => entry.id(),
7237 None => {
7238 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7239 name: name.to_string(),
7240 object_type,
7241 });
7242
7243 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7244 }
7245 };
7246
7247 let secret_as = query::plan_secret_as(scx, value)?;
7248
7249 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7250}
7251
7252pub fn describe_alter_connection(
7253 _: &StatementContext,
7254 _: AlterConnectionStatement<Aug>,
7255) -> Result<StatementDesc, PlanError> {
7256 Ok(StatementDesc::new(None))
7257}
7258
7259generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7260
7261pub fn plan_alter_connection(
7262 scx: &StatementContext,
7263 stmt: AlterConnectionStatement<Aug>,
7264) -> Result<Plan, PlanError> {
7265 let AlterConnectionStatement {
7266 name,
7267 if_exists,
7268 actions,
7269 with_options,
7270 } = stmt;
7271 let conn_name = normalize::unresolved_item_name(name)?;
7272 let entry = match scx.catalog.resolve_item(&conn_name) {
7273 Ok(entry) => entry,
7274 Err(_) if if_exists => {
7275 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7276 name: conn_name.to_string(),
7277 object_type: ObjectType::Sink,
7278 });
7279
7280 return Ok(Plan::AlterNoop(AlterNoopPlan {
7281 object_type: ObjectType::Connection,
7282 }));
7283 }
7284 Err(e) => return Err(e.into()),
7285 };
7286
7287 let connection = entry.connection()?;
7288
7289 if actions
7290 .iter()
7291 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7292 {
7293 if actions.len() > 1 {
7294 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7295 }
7296
7297 if !with_options.is_empty() {
7298 sql_bail!(
7299 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7300 with_options
7301 .iter()
7302 .map(|o| o.to_ast_string_simple())
7303 .join(", ")
7304 );
7305 }
7306
7307 if !matches!(connection, Connection::Ssh(_)) {
7308 sql_bail!(
7309 "{} is not an SSH connection",
7310 scx.catalog.resolve_full_name(entry.name())
7311 )
7312 }
7313
7314 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7315 id: entry.id(),
7316 action: crate::plan::AlterConnectionAction::RotateKeys,
7317 }));
7318 }
7319
7320 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7321 if options.validate.is_some() {
7322 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7323 }
7324
7325 let validate = match options.validate {
7326 Some(val) => val,
7327 None => {
7328 scx.catalog
7329 .system_vars()
7330 .enable_default_connection_validation()
7331 && connection.validate_by_default()
7332 }
7333 };
7334
7335 let connection_type = match connection {
7336 Connection::Aws(_) => CreateConnectionType::Aws,
7337 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7338 Connection::Kafka(_) => CreateConnectionType::Kafka,
7339 Connection::Csr(_) => CreateConnectionType::Csr,
7340 Connection::Postgres(_) => CreateConnectionType::Postgres,
7341 Connection::Ssh(_) => CreateConnectionType::Ssh,
7342 Connection::MySql(_) => CreateConnectionType::MySql,
7343 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7344 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7345 };
7346
7347 let specified_options: BTreeSet<_> = actions
7349 .iter()
7350 .map(|action: &AlterConnectionAction<Aug>| match action {
7351 AlterConnectionAction::SetOption(option) => option.name.clone(),
7352 AlterConnectionAction::DropOption(name) => name.clone(),
7353 AlterConnectionAction::RotateKeys => unreachable!(),
7354 })
7355 .collect();
7356
7357 for invalid in INALTERABLE_OPTIONS {
7358 if specified_options.contains(invalid) {
7359 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7360 }
7361 }
7362
7363 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7364
7365 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7367 actions.into_iter().partition_map(|action| match action {
7368 AlterConnectionAction::SetOption(option) => Either::Left(option),
7369 AlterConnectionAction::DropOption(name) => Either::Right(name),
7370 AlterConnectionAction::RotateKeys => unreachable!(),
7371 });
7372
7373 let set_options: BTreeMap<_, _> = set_options_vec
7374 .clone()
7375 .into_iter()
7376 .map(|option| (option.name, option.value))
7377 .collect();
7378
7379 let connection_options_extracted =
7383 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7384
7385 let duplicates: Vec<_> = connection_options_extracted
7386 .seen
7387 .intersection(&drop_options)
7388 .collect();
7389
7390 if !duplicates.is_empty() {
7391 sql_bail!(
7392 "cannot both SET and DROP/RESET options {}",
7393 duplicates
7394 .iter()
7395 .map(|option| option.to_string())
7396 .join(", ")
7397 )
7398 }
7399
7400 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7401 let set_options_count = mutually_exclusive_options
7402 .iter()
7403 .filter(|o| set_options.contains_key(o))
7404 .count();
7405 let drop_options_count = mutually_exclusive_options
7406 .iter()
7407 .filter(|o| drop_options.contains(o))
7408 .count();
7409
7410 if set_options_count > 0 && drop_options_count > 0 {
7412 sql_bail!(
7413 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7414 connection_type,
7415 mutually_exclusive_options
7416 .iter()
7417 .map(|option| option.to_string())
7418 .join(", ")
7419 )
7420 }
7421
7422 if set_options_count > 0 || drop_options_count > 0 {
7427 drop_options.extend(mutually_exclusive_options.iter().cloned());
7428 }
7429
7430 }
7433
7434 Ok(Plan::AlterConnection(AlterConnectionPlan {
7435 id: entry.id(),
7436 action: crate::plan::AlterConnectionAction::AlterOptions {
7437 set_options,
7438 drop_options,
7439 validate,
7440 },
7441 }))
7442}
7443
7444pub fn describe_alter_sink(
7445 _: &StatementContext,
7446 _: AlterSinkStatement<Aug>,
7447) -> Result<StatementDesc, PlanError> {
7448 Ok(StatementDesc::new(None))
7449}
7450
7451pub fn plan_alter_sink(
7452 scx: &mut StatementContext,
7453 stmt: AlterSinkStatement<Aug>,
7454) -> Result<Plan, PlanError> {
7455 let AlterSinkStatement {
7456 sink_name,
7457 if_exists,
7458 action,
7459 } = stmt;
7460
7461 let object_type = ObjectType::Sink;
7462 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7463
7464 let Some(item) = item else {
7465 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7466 name: sink_name.to_string(),
7467 object_type,
7468 });
7469
7470 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7471 };
7472 let item = item.at_version(RelationVersionSelector::Latest);
7474
7475 match action {
7476 AlterSinkAction::ChangeRelation(new_from) => {
7477 let create_sql = item.create_sql();
7479 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7480 let [stmt]: [StatementParseResult; 1] = stmts
7481 .try_into()
7482 .expect("create sql of sink was not exactly one statement");
7483 let Statement::CreateSink(stmt) = stmt.ast else {
7484 unreachable!("invalid create SQL for sink item");
7485 };
7486
7487 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7489 stmt.from = new_from;
7490
7491 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7493 unreachable!("invalid plan for CREATE SINK statement");
7494 };
7495
7496 plan.sink.version += 1;
7497
7498 Ok(Plan::AlterSink(AlterSinkPlan {
7499 item_id: item.id(),
7500 global_id: item.global_id(),
7501 sink: plan.sink,
7502 with_snapshot: plan.with_snapshot,
7503 in_cluster: plan.in_cluster,
7504 }))
7505 }
7506 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7507 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7508 }
7509}
7510
7511pub fn describe_alter_source(
7512 _: &StatementContext,
7513 _: AlterSourceStatement<Aug>,
7514) -> Result<StatementDesc, PlanError> {
7515 Ok(StatementDesc::new(None))
7517}
7518
7519generate_extracted_config!(
7520 AlterSourceAddSubsourceOption,
7521 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7522 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7523 (Details, String)
7524);
7525
7526pub fn plan_alter_source(
7527 scx: &mut StatementContext,
7528 stmt: AlterSourceStatement<Aug>,
7529) -> Result<Plan, PlanError> {
7530 let AlterSourceStatement {
7531 source_name,
7532 if_exists,
7533 action,
7534 } = stmt;
7535 let object_type = ObjectType::Source;
7536
7537 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7538 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7539 name: source_name.to_string(),
7540 object_type,
7541 });
7542
7543 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7544 }
7545
7546 match action {
7547 AlterSourceAction::SetOptions(options) => {
7548 let mut options = options.into_iter();
7549 let option = options.next().unwrap();
7550 if option.name == CreateSourceOptionName::RetainHistory {
7551 if options.next().is_some() {
7552 sql_bail!("RETAIN HISTORY must be only option");
7553 }
7554 return alter_retain_history(
7555 scx,
7556 object_type,
7557 if_exists,
7558 UnresolvedObjectName::Item(source_name),
7559 option.value,
7560 );
7561 }
7562 if option.name == CreateSourceOptionName::TimestampInterval {
7563 if options.next().is_some() {
7564 sql_bail!("TIMESTAMP INTERVAL must be only option");
7565 }
7566 return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7567 }
7568 sql_bail!(
7571 "Cannot modify the {} of a SOURCE.",
7572 option.name.to_ast_string_simple()
7573 );
7574 }
7575 AlterSourceAction::ResetOptions(reset) => {
7576 let mut options = reset.into_iter();
7577 let option = options.next().unwrap();
7578 if option == CreateSourceOptionName::RetainHistory {
7579 if options.next().is_some() {
7580 sql_bail!("RETAIN HISTORY must be only option");
7581 }
7582 return alter_retain_history(
7583 scx,
7584 object_type,
7585 if_exists,
7586 UnresolvedObjectName::Item(source_name),
7587 None,
7588 );
7589 }
7590 if option == CreateSourceOptionName::TimestampInterval {
7591 if options.next().is_some() {
7592 sql_bail!("TIMESTAMP INTERVAL must be only option");
7593 }
7594 return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7595 }
7596 sql_bail!(
7597 "Cannot modify the {} of a SOURCE.",
7598 option.to_ast_string_simple()
7599 );
7600 }
7601 AlterSourceAction::DropSubsources { .. } => {
7602 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7603 }
7604 AlterSourceAction::AddSubsources { .. } => {
7605 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7606 }
7607 AlterSourceAction::RefreshReferences => {
7608 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7609 }
7610 };
7611}
7612
7613pub fn describe_alter_system_set(
7614 _: &StatementContext,
7615 _: AlterSystemSetStatement,
7616) -> Result<StatementDesc, PlanError> {
7617 Ok(StatementDesc::new(None))
7618}
7619
7620pub fn plan_alter_system_set(
7621 _: &StatementContext,
7622 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7623) -> Result<Plan, PlanError> {
7624 let name = name.to_string();
7625 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7626 name,
7627 value: scl::plan_set_variable_to(to)?,
7628 }))
7629}
7630
7631pub fn describe_alter_system_reset(
7632 _: &StatementContext,
7633 _: AlterSystemResetStatement,
7634) -> Result<StatementDesc, PlanError> {
7635 Ok(StatementDesc::new(None))
7636}
7637
7638pub fn plan_alter_system_reset(
7639 _: &StatementContext,
7640 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7641) -> Result<Plan, PlanError> {
7642 let name = name.to_string();
7643 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7644}
7645
7646pub fn describe_alter_system_reset_all(
7647 _: &StatementContext,
7648 _: AlterSystemResetAllStatement,
7649) -> Result<StatementDesc, PlanError> {
7650 Ok(StatementDesc::new(None))
7651}
7652
7653pub fn plan_alter_system_reset_all(
7654 _: &StatementContext,
7655 _: AlterSystemResetAllStatement,
7656) -> Result<Plan, PlanError> {
7657 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7658}
7659
7660pub fn describe_alter_role(
7661 _: &StatementContext,
7662 _: AlterRoleStatement<Aug>,
7663) -> Result<StatementDesc, PlanError> {
7664 Ok(StatementDesc::new(None))
7665}
7666
7667pub fn plan_alter_role(
7668 scx: &StatementContext,
7669 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7670) -> Result<Plan, PlanError> {
7671 let option = match option {
7672 AlterRoleOption::Attributes(attrs) => {
7673 let attrs = plan_role_attributes(attrs, scx)?;
7674 PlannedAlterRoleOption::Attributes(attrs)
7675 }
7676 AlterRoleOption::Variable(variable) => {
7677 let var = plan_role_variable(variable)?;
7678 PlannedAlterRoleOption::Variable(var)
7679 }
7680 };
7681
7682 Ok(Plan::AlterRole(AlterRolePlan {
7683 id: name.id,
7684 name: name.name,
7685 option,
7686 }))
7687}
7688
7689pub fn describe_alter_table_add_column(
7690 _: &StatementContext,
7691 _: AlterTableAddColumnStatement<Aug>,
7692) -> Result<StatementDesc, PlanError> {
7693 Ok(StatementDesc::new(None))
7694}
7695
7696pub fn plan_alter_table_add_column(
7697 scx: &StatementContext,
7698 stmt: AlterTableAddColumnStatement<Aug>,
7699) -> Result<Plan, PlanError> {
7700 let AlterTableAddColumnStatement {
7701 if_exists,
7702 name,
7703 if_col_not_exist,
7704 column_name,
7705 data_type,
7706 } = stmt;
7707 let object_type = ObjectType::Table;
7708
7709 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7710
7711 let (relation_id, item_name, desc) =
7712 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7713 Some(item) => {
7714 let item_name = scx.catalog.resolve_full_name(item.name());
7716 let item = item.at_version(RelationVersionSelector::Latest);
7717 let desc = item.relation_desc().expect("table has desc").into_owned();
7718 (item.id(), item_name, desc)
7719 }
7720 None => {
7721 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7722 name: name.to_ast_string_simple(),
7723 object_type,
7724 });
7725 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7726 }
7727 };
7728
7729 let column_name = ColumnName::from(column_name.as_str());
7730 if desc.get_by_name(&column_name).is_some() {
7731 if if_col_not_exist {
7732 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7733 column_name: column_name.to_string(),
7734 object_name: item_name.item,
7735 });
7736 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7737 } else {
7738 return Err(PlanError::ColumnAlreadyExists {
7739 column_name,
7740 object_name: item_name.item,
7741 });
7742 }
7743 }
7744
7745 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7746 let column_type = scalar_type.nullable(true);
7748 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7750
7751 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7752 relation_id,
7753 column_name,
7754 column_type,
7755 raw_sql_type,
7756 }))
7757}
7758
7759pub fn describe_alter_materialized_view_apply_replacement(
7760 _: &StatementContext,
7761 _: AlterMaterializedViewApplyReplacementStatement,
7762) -> Result<StatementDesc, PlanError> {
7763 Ok(StatementDesc::new(None))
7764}
7765
7766pub fn plan_alter_materialized_view_apply_replacement(
7767 scx: &StatementContext,
7768 stmt: AlterMaterializedViewApplyReplacementStatement,
7769) -> Result<Plan, PlanError> {
7770 let AlterMaterializedViewApplyReplacementStatement {
7771 if_exists,
7772 name,
7773 replacement_name,
7774 } = stmt;
7775
7776 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7777
7778 let object_type = ObjectType::MaterializedView;
7779 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7780 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7781 name: name.to_ast_string_simple(),
7782 object_type,
7783 });
7784 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7785 };
7786
7787 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7788 .expect("if_exists not set");
7789
7790 if replacement.replacement_target() != Some(mv.id()) {
7791 return Err(PlanError::InvalidReplacement {
7792 item_type: mv.item_type(),
7793 item_name: scx.catalog.minimal_qualification(mv.name()),
7794 replacement_type: replacement.item_type(),
7795 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7796 });
7797 }
7798
7799 Ok(Plan::AlterMaterializedViewApplyReplacement(
7800 AlterMaterializedViewApplyReplacementPlan {
7801 id: mv.id(),
7802 replacement_id: replacement.id(),
7803 },
7804 ))
7805}
7806
7807pub fn describe_comment(
7808 _: &StatementContext,
7809 _: CommentStatement<Aug>,
7810) -> Result<StatementDesc, PlanError> {
7811 Ok(StatementDesc::new(None))
7812}
7813
7814pub fn plan_comment(
7815 scx: &mut StatementContext,
7816 stmt: CommentStatement<Aug>,
7817) -> Result<Plan, PlanError> {
7818 const MAX_COMMENT_LENGTH: usize = 1024;
7819
7820 let CommentStatement { object, comment } = stmt;
7821
7822 if let Some(c) = &comment {
7824 if c.len() > 1024 {
7825 return Err(PlanError::CommentTooLong {
7826 length: c.len(),
7827 max_size: MAX_COMMENT_LENGTH,
7828 });
7829 }
7830 }
7831
7832 let (object_id, column_pos) = match &object {
7833 com_ty @ CommentObjectType::Table { name }
7834 | com_ty @ CommentObjectType::View { name }
7835 | com_ty @ CommentObjectType::MaterializedView { name }
7836 | com_ty @ CommentObjectType::Index { name }
7837 | com_ty @ CommentObjectType::Func { name }
7838 | com_ty @ CommentObjectType::Connection { name }
7839 | com_ty @ CommentObjectType::Source { name }
7840 | com_ty @ CommentObjectType::Sink { name }
7841 | com_ty @ CommentObjectType::Secret { name }
7842 | com_ty @ CommentObjectType::ContinualTask { name } => {
7843 let item = scx.get_item_by_resolved_name(name)?;
7844 match (com_ty, item.item_type()) {
7845 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7846 (CommentObjectId::Table(item.id()), None)
7847 }
7848 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7849 (CommentObjectId::View(item.id()), None)
7850 }
7851 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7852 (CommentObjectId::MaterializedView(item.id()), None)
7853 }
7854 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7855 (CommentObjectId::Index(item.id()), None)
7856 }
7857 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7858 (CommentObjectId::Func(item.id()), None)
7859 }
7860 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7861 (CommentObjectId::Connection(item.id()), None)
7862 }
7863 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7864 (CommentObjectId::Source(item.id()), None)
7865 }
7866 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7867 (CommentObjectId::Sink(item.id()), None)
7868 }
7869 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7870 (CommentObjectId::Secret(item.id()), None)
7871 }
7872 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7873 (CommentObjectId::ContinualTask(item.id()), None)
7874 }
7875 (com_ty, cat_ty) => {
7876 let expected_type = match com_ty {
7877 CommentObjectType::Table { .. } => ObjectType::Table,
7878 CommentObjectType::View { .. } => ObjectType::View,
7879 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7880 CommentObjectType::Index { .. } => ObjectType::Index,
7881 CommentObjectType::Func { .. } => ObjectType::Func,
7882 CommentObjectType::Connection { .. } => ObjectType::Connection,
7883 CommentObjectType::Source { .. } => ObjectType::Source,
7884 CommentObjectType::Sink { .. } => ObjectType::Sink,
7885 CommentObjectType::Secret { .. } => ObjectType::Secret,
7886 _ => unreachable!("these are the only types we match on"),
7887 };
7888
7889 return Err(PlanError::InvalidObjectType {
7890 expected_type: SystemObjectType::Object(expected_type),
7891 actual_type: SystemObjectType::Object(cat_ty.into()),
7892 object_name: item.name().item.clone(),
7893 });
7894 }
7895 }
7896 }
7897 CommentObjectType::Type { ty } => match ty {
7898 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7899 sql_bail!("cannot comment on anonymous list or map type");
7900 }
7901 ResolvedDataType::Named { id, modifiers, .. } => {
7902 if !modifiers.is_empty() {
7903 sql_bail!("cannot comment on type with modifiers");
7904 }
7905 (CommentObjectId::Type(*id), None)
7906 }
7907 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7908 },
7909 CommentObjectType::Column { name } => {
7910 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7911 match item.item_type() {
7912 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7913 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7914 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7915 CatalogItemType::MaterializedView => {
7916 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7917 }
7918 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7919 r => {
7920 return Err(PlanError::Unsupported {
7921 feature: format!("Specifying comments on a column of {r}"),
7922 discussion_no: None,
7923 });
7924 }
7925 }
7926 }
7927 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7928 CommentObjectType::Database { name } => {
7929 (CommentObjectId::Database(*name.database_id()), None)
7930 }
7931 CommentObjectType::Schema { name } => {
7932 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7936 sql_bail!(
7937 "cannot comment on schema {} because it is a temporary schema",
7938 mz_repr::namespaces::MZ_TEMP_SCHEMA
7939 );
7940 }
7941 (
7942 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7943 None,
7944 )
7945 }
7946 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7947 CommentObjectType::ClusterReplica { name } => {
7948 let replica = scx.catalog.resolve_cluster_replica(name)?;
7949 (
7950 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7951 None,
7952 )
7953 }
7954 CommentObjectType::NetworkPolicy { name } => {
7955 (CommentObjectId::NetworkPolicy(name.id), None)
7956 }
7957 };
7958
7959 if let Some(p) = column_pos {
7965 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7966 max_num_columns: MAX_NUM_COLUMNS,
7967 req_num_columns: p,
7968 })?;
7969 }
7970
7971 Ok(Plan::Comment(CommentPlan {
7972 object_id,
7973 sub_component: column_pos,
7974 comment,
7975 }))
7976}
7977
7978pub(crate) fn resolve_cluster<'a>(
7979 scx: &'a StatementContext,
7980 name: &'a Ident,
7981 if_exists: bool,
7982) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7983 match scx.resolve_cluster(Some(name)) {
7984 Ok(cluster) => Ok(Some(cluster)),
7985 Err(_) if if_exists => Ok(None),
7986 Err(e) => Err(e),
7987 }
7988}
7989
7990pub(crate) fn resolve_cluster_replica<'a>(
7991 scx: &'a StatementContext,
7992 name: &QualifiedReplica,
7993 if_exists: bool,
7994) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7995 match scx.resolve_cluster(Some(&name.cluster)) {
7996 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7997 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7998 None if if_exists => Ok(None),
7999 None => Err(sql_err!(
8000 "CLUSTER {} has no CLUSTER REPLICA named {}",
8001 cluster.name(),
8002 name.replica.as_str().quoted(),
8003 )),
8004 },
8005 Err(_) if if_exists => Ok(None),
8006 Err(e) => Err(e),
8007 }
8008}
8009
8010pub(crate) fn resolve_database<'a>(
8011 scx: &'a StatementContext,
8012 name: &'a UnresolvedDatabaseName,
8013 if_exists: bool,
8014) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
8015 match scx.resolve_database(name) {
8016 Ok(database) => Ok(Some(database)),
8017 Err(_) if if_exists => Ok(None),
8018 Err(e) => Err(e),
8019 }
8020}
8021
8022pub(crate) fn resolve_schema<'a>(
8023 scx: &'a StatementContext,
8024 name: UnresolvedSchemaName,
8025 if_exists: bool,
8026) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
8027 match scx.resolve_schema(name) {
8028 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
8029 Err(_) if if_exists => Ok(None),
8030 Err(e) => Err(e),
8031 }
8032}
8033
8034pub(crate) fn resolve_network_policy<'a>(
8035 scx: &'a StatementContext,
8036 name: Ident,
8037 if_exists: bool,
8038) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
8039 match scx.catalog.resolve_network_policy(&name.to_string()) {
8040 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
8041 id: policy.id(),
8042 name: policy.name().to_string(),
8043 })),
8044 Err(_) if if_exists => Ok(None),
8045 Err(e) => Err(e.into()),
8046 }
8047}
8048
8049pub(crate) fn resolve_item_or_type<'a>(
8050 scx: &'a StatementContext,
8051 object_type: ObjectType,
8052 name: UnresolvedItemName,
8053 if_exists: bool,
8054) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
8055 let name = normalize::unresolved_item_name(name)?;
8056 let catalog_item = match object_type {
8057 ObjectType::Type => scx.catalog.resolve_type(&name),
8058 ObjectType::Table
8059 | ObjectType::View
8060 | ObjectType::MaterializedView
8061 | ObjectType::Source
8062 | ObjectType::Sink
8063 | ObjectType::Index
8064 | ObjectType::Role
8065 | ObjectType::Cluster
8066 | ObjectType::ClusterReplica
8067 | ObjectType::Secret
8068 | ObjectType::Connection
8069 | ObjectType::Database
8070 | ObjectType::Schema
8071 | ObjectType::Func
8072 | ObjectType::ContinualTask
8073 | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
8074 };
8075
8076 match catalog_item {
8077 Ok(item) => {
8078 let is_type = ObjectType::from(item.item_type());
8079 if object_type == is_type {
8080 Ok(Some(item))
8081 } else {
8082 Err(PlanError::MismatchedObjectType {
8083 name: scx.catalog.minimal_qualification(item.name()),
8084 is_type,
8085 expected_type: object_type,
8086 })
8087 }
8088 }
8089 Err(_) if if_exists => Ok(None),
8090 Err(e) => Err(e.into()),
8091 }
8092}
8093
8094fn ensure_cluster_is_not_managed(
8096 scx: &StatementContext,
8097 cluster_id: ClusterId,
8098) -> Result<(), PlanError> {
8099 let cluster = scx.catalog.get_cluster(cluster_id);
8100 if cluster.is_managed() {
8101 Err(PlanError::ManagedCluster {
8102 cluster_name: cluster.name().to_string(),
8103 })
8104 } else {
8105 Ok(())
8106 }
8107}