1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155 ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156 ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157 CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158 CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165 WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188 if partition_by.len() > desc.len() {
189 tracing::error!(
190 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191 desc.len(),
192 partition_by.len()
193 );
194 partition_by.truncate(desc.len());
195 }
196
197 let desc_prefix = desc.iter().take(partition_by.len());
198 for (idx, ((desc_name, desc_type), partition_name)) in
199 desc_prefix.zip_eq(partition_by).enumerate()
200 {
201 let partition_name = normalize::column_name(partition_name);
202 if *desc_name != partition_name {
203 sql_bail!(
204 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205 );
206 }
207 if !preserves_order(&desc_type.scalar_type) {
208 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209 }
210 }
211 Ok(())
212}
213
214pub fn describe_create_database(
215 _: &StatementContext,
216 _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218 Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222 _: &StatementContext,
223 CreateDatabaseStatement {
224 name,
225 if_not_exists,
226 }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228 Ok(Plan::CreateDatabase(CreateDatabasePlan {
229 name: normalize::ident(name.0),
230 if_not_exists,
231 }))
232}
233
234pub fn describe_create_schema(
235 _: &StatementContext,
236 _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238 Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242 scx: &StatementContext,
243 CreateSchemaStatement {
244 mut name,
245 if_not_exists,
246 }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248 if name.0.len() > 2 {
249 sql_bail!("schema name {} has more than two components", name);
250 }
251 let schema_name = normalize::ident(
252 name.0
253 .pop()
254 .expect("names always have at least one component"),
255 );
256 let database_spec = match name.0.pop() {
257 None => match scx.catalog.active_database() {
258 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259 None => sql_bail!("no database specified and no active database"),
260 },
261 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263 Err(_) => sql_bail!("invalid database {}", n.as_str()),
264 },
265 };
266 Ok(Plan::CreateSchema(CreateSchemaPlan {
267 database_spec,
268 schema_name,
269 if_not_exists,
270 }))
271}
272
273pub fn describe_create_table(
274 _: &StatementContext,
275 _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277 Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281 scx: &StatementContext,
282 stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284 let CreateTableStatement {
285 name,
286 columns,
287 constraints,
288 if_not_exists,
289 temporary,
290 with_options,
291 } = &stmt;
292
293 let names: Vec<_> = columns
294 .iter()
295 .filter(|c| {
296 let is_versioned = c
300 .options
301 .iter()
302 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303 !is_versioned
304 })
305 .map(|c| normalize::column_name(c.name.clone()))
306 .collect();
307
308 if let Some(dup) = names.iter().duplicates().next() {
309 sql_bail!("column {} specified more than once", dup.quoted());
310 }
311
312 let mut column_types = Vec::with_capacity(columns.len());
315 let mut defaults = Vec::with_capacity(columns.len());
316 let mut changes = BTreeMap::new();
317 let mut keys = Vec::new();
318
319 for (i, c) in columns.into_iter().enumerate() {
320 let aug_data_type = &c.data_type;
321 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322 let mut nullable = true;
323 let mut default = Expr::null();
324 let mut versioned = false;
325 for option in &c.options {
326 match &option.option {
327 ColumnOption::NotNull => nullable = false,
328 ColumnOption::Default(expr) => {
329 let mut expr = expr.clone();
332 transform_ast::transform(scx, &mut expr)?;
333 let _ = query::plan_default_expr(scx, &expr, &ty)?;
334 default = expr.clone();
335 }
336 ColumnOption::Unique { is_primary } => {
337 keys.push(vec![i]);
338 if *is_primary {
339 nullable = false;
340 }
341 }
342 ColumnOption::Versioned { action, version } => {
343 let version = RelationVersion::from(*version);
344 versioned = true;
345
346 let name = normalize::column_name(c.name.clone());
347 let typ = ty.clone().nullable(nullable);
348
349 changes.insert(version, (action.clone(), name, typ));
350 }
351 other => {
352 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353 }
354 }
355 }
356 if !versioned {
359 column_types.push(ty.nullable(nullable));
360 }
361 defaults.push(default);
362 }
363
364 let mut seen_primary = false;
365 'c: for constraint in constraints {
366 match constraint {
367 TableConstraint::Unique {
368 name: _,
369 columns,
370 is_primary,
371 nulls_not_distinct,
372 } => {
373 if seen_primary && *is_primary {
374 sql_bail!(
375 "multiple primary keys for table {} are not allowed",
376 name.to_ast_string_stable()
377 );
378 }
379 seen_primary = *is_primary || seen_primary;
380
381 let mut key = vec![];
382 for column in columns {
383 let column = normalize::column_name(column.clone());
384 match names.iter().position(|name| *name == column) {
385 None => sql_bail!("unknown column in constraint: {}", column),
386 Some(i) => {
387 let nullable = &mut column_types[i].nullable;
388 if *is_primary {
389 if *nulls_not_distinct {
390 sql_bail!(
391 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392 );
393 }
394
395 *nullable = false;
396 } else if !(*nulls_not_distinct || !*nullable) {
397 break 'c;
400 }
401
402 key.push(i);
403 }
404 }
405 }
406
407 if *is_primary {
408 keys.insert(0, key);
409 } else {
410 keys.push(key);
411 }
412 }
413 TableConstraint::ForeignKey { .. } => {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417 }
418 TableConstraint::Check { .. } => {
419 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422 }
423 }
424 }
425
426 if !keys.is_empty() {
427 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430 }
431
432 let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434 let temporary = *temporary;
435 let name = if temporary {
436 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437 } else {
438 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 };
440
441 let full_name = scx.catalog.resolve_full_name(&name);
443 let partial_name = PartialItemName::from(full_name.clone());
444 if let (false, Ok(item)) = (
447 if_not_exists,
448 scx.catalog.resolve_item_or_type(&partial_name),
449 ) {
450 return Err(PlanError::ItemAlreadyExists {
451 name: full_name.to_string(),
452 item_type: item.item_type(),
453 });
454 }
455
456 let desc = RelationDesc::new(typ, names);
457 let mut desc = VersionedRelationDesc::new(desc);
458 for (version, (_action, name, typ)) in changes.into_iter() {
459 let new_version = desc.add_column(name, typ);
460 if version != new_version {
461 return Err(PlanError::InvalidTable {
462 name: full_name.item,
463 });
464 }
465 }
466
467 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477 let compaction_window = options.iter().find_map(|o| {
478 #[allow(irrefutable_let_patterns)]
479 if let crate::plan::TableOption::RetainHistory(lcw) = o {
480 Some(lcw.clone())
481 } else {
482 None
483 }
484 });
485
486 let table = Table {
487 create_sql,
488 desc,
489 temporary,
490 compaction_window,
491 data_source: TableDataSource::TableWrites { defaults },
492 };
493 Ok(Plan::CreateTable(CreateTablePlan {
494 name,
495 table,
496 if_not_exists: *if_not_exists,
497 }))
498}
499
500pub fn describe_create_table_from_source(
501 _: &StatementContext,
502 _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504 Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508 _: &StatementContext,
509 _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511 Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515 _: &StatementContext,
516 _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518 Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522 _: &StatementContext,
523 _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525 Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529 CreateSourceOption,
530 (TimestampInterval, Duration),
531 (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535 PgConfigOption,
536 (Details, String),
537 (Publication, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543 MySqlConfigOption,
544 (Details, String),
545 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550 SqlServerConfigOption,
551 (Details, String),
552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557 scx: &StatementContext,
558 mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560 if stmt.is_table {
561 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562 }
563
564 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567 let enable_multi_replica_sources =
568 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569 if !enable_multi_replica_sources {
570 if in_cluster.replica_ids().len() > 1 {
571 sql_bail!("cannot create webhook source in cluster with more than one replica")
572 }
573 }
574 let create_sql =
575 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577 let CreateWebhookSourceStatement {
578 name,
579 if_not_exists,
580 body_format,
581 include_headers,
582 validate_using,
583 is_table,
584 in_cluster: _,
586 } = stmt;
587
588 let validate_using = validate_using
589 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590 .transpose()?;
591 if let Some(WebhookValidation { expression, .. }) = &validate_using {
592 if !expression.contains_column() {
595 return Err(PlanError::WebhookValidationDoesNotUseColumns);
596 }
597 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601 return Err(PlanError::WebhookValidationNonDeterministic);
602 }
603 }
604
605 let body_format = match body_format {
606 Format::Bytes => WebhookBodyFormat::Bytes,
607 Format::Json { array } => WebhookBodyFormat::Json { array },
608 Format::Text => WebhookBodyFormat::Text,
609 ty => {
611 return Err(PlanError::Unsupported {
612 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613 discussion_no: None,
614 });
615 }
616 };
617
618 let mut column_ty = vec![
619 SqlColumnType {
621 scalar_type: SqlScalarType::from(body_format),
622 nullable: false,
623 },
624 ];
625 let mut column_names = vec!["body".to_string()];
626
627 let mut headers = WebhookHeaders::default();
628
629 if let Some(filters) = include_headers.column {
631 column_ty.push(SqlColumnType {
632 scalar_type: SqlScalarType::Map {
633 value_type: Box::new(SqlScalarType::String),
634 custom_id: None,
635 },
636 nullable: false,
637 });
638 column_names.push("headers".to_string());
639
640 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641 filters.into_iter().partition_map(|filter| {
642 if filter.block {
643 itertools::Either::Right(filter.header_name)
644 } else {
645 itertools::Either::Left(filter.header_name)
646 }
647 });
648 headers.header_column = Some(WebhookHeaderFilters { allow, block });
649 }
650
651 for header in include_headers.mappings {
653 let scalar_type = header
654 .use_bytes
655 .then_some(SqlScalarType::Bytes)
656 .unwrap_or(SqlScalarType::String);
657 column_ty.push(SqlColumnType {
658 scalar_type,
659 nullable: true,
660 });
661 column_names.push(header.column_name.into_string());
662
663 let column_idx = column_ty.len() - 1;
664 assert_eq!(
666 column_idx,
667 column_names.len() - 1,
668 "header column names and types don't match"
669 );
670 headers
671 .mapped_headers
672 .insert(column_idx, (header.header_name, header.use_bytes));
673 }
674
675 let mut unique_check = HashSet::with_capacity(column_names.len());
677 for name in &column_names {
678 if !unique_check.insert(name) {
679 return Err(PlanError::AmbiguousColumn(name.clone().into()));
680 }
681 }
682 if column_names.len() > MAX_NUM_COLUMNS {
683 return Err(PlanError::TooManyColumns {
684 max_num_columns: MAX_NUM_COLUMNS,
685 req_num_columns: column_names.len(),
686 });
687 }
688
689 let typ = SqlRelationType::new(column_ty);
690 let desc = RelationDesc::new(typ, column_names);
691
692 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694 let full_name = scx.catalog.resolve_full_name(&name);
695 let partial_name = PartialItemName::from(full_name.clone());
696 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697 return Err(PlanError::ItemAlreadyExists {
698 name: full_name.to_string(),
699 item_type: item.item_type(),
700 });
701 }
702
703 let timeline = Timeline::EpochMilliseconds;
706
707 let plan = if is_table {
708 let data_source = DataSourceDesc::Webhook {
709 validate_using,
710 body_format,
711 headers,
712 cluster_id: Some(in_cluster.id()),
713 };
714 let data_source = TableDataSource::DataSource {
715 desc: data_source,
716 timeline,
717 };
718 Plan::CreateTable(CreateTablePlan {
719 name,
720 if_not_exists,
721 table: Table {
722 create_sql,
723 desc: VersionedRelationDesc::new(desc),
724 temporary: false,
725 compaction_window: None,
726 data_source,
727 },
728 })
729 } else {
730 let data_source = DataSourceDesc::Webhook {
731 validate_using,
732 body_format,
733 headers,
734 cluster_id: None,
736 };
737 Plan::CreateSource(CreateSourcePlan {
738 name,
739 source: Source {
740 create_sql,
741 data_source,
742 desc,
743 compaction_window: None,
744 },
745 if_not_exists,
746 timeline,
747 in_cluster: Some(in_cluster.id()),
748 })
749 };
750
751 Ok(plan)
752}
753
754pub fn plan_create_source(
755 scx: &StatementContext,
756 mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758 let CreateSourceStatement {
759 name,
760 in_cluster: _,
761 col_names,
762 connection: source_connection,
763 envelope,
764 if_not_exists,
765 format,
766 key_constraint,
767 include_metadata,
768 with_options,
769 external_references: referenced_subsources,
770 progress_subsource,
771 } = &stmt;
772
773 mz_ore::soft_assert_or_log!(
774 referenced_subsources.is_none(),
775 "referenced subsources must be cleared in purification"
776 );
777
778 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779 && scx.catalog.system_vars().force_source_table_syntax();
780
781 if force_source_table_syntax {
784 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785 Err(PlanError::UseTablesForSources(
786 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787 ))?;
788 }
789 }
790
791 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794 && include_metadata
795 .iter()
796 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797 {
798 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800 }
801 if !matches!(
802 source_connection,
803 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804 ) && !include_metadata.is_empty()
805 {
806 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807 }
808
809 if !include_metadata.is_empty()
810 && !matches!(
811 envelope,
812 ast::SourceEnvelope::Upsert { .. }
813 | ast::SourceEnvelope::None
814 | ast::SourceEnvelope::Debezium
815 )
816 {
817 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818 }
819
820 let external_connection =
821 plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823 let CreateSourceOptionExtracted {
824 timestamp_interval,
825 retain_history,
826 seen: _,
827 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829 let metadata_columns_desc = match external_connection {
830 GenericSourceConnection::Kafka(KafkaSourceConnection {
831 ref metadata_columns,
832 ..
833 }) => kafka_metadata_columns_desc(metadata_columns),
834 _ => vec![],
835 };
836
837 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839 scx,
840 &envelope,
841 format,
842 Some(external_connection.default_key_desc()),
843 external_connection.default_value_desc(),
844 include_metadata,
845 metadata_columns_desc,
846 &external_connection,
847 )?;
848 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850 let names: Vec<_> = desc.iter_names().cloned().collect();
851 if let Some(dup) = names.iter().duplicates().next() {
852 sql_bail!("column {} specified more than once", dup.quoted());
853 }
854
855 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861 let key_columns = columns
862 .into_iter()
863 .map(normalize::column_name)
864 .collect::<Vec<_>>();
865
866 let mut uniq = BTreeSet::new();
867 for col in key_columns.iter() {
868 if !uniq.insert(col) {
869 sql_bail!("Repeated column name in source key constraint: {}", col);
870 }
871 }
872
873 let key_indices = key_columns
874 .iter()
875 .map(|col| {
876 let name_idx = desc
877 .get_by_name(col)
878 .map(|(idx, _type)| idx)
879 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880 if desc.get_unambiguous_name(name_idx).is_none() {
881 sql_bail!("Ambiguous column in source key constraint: {}", col);
882 }
883 Ok(name_idx)
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886
887 if !desc.typ().keys.is_empty() {
888 return Err(key_constraint_err(&desc, &key_columns));
889 } else {
890 desc = desc.with_key(key_indices);
891 }
892 }
893
894 let timestamp_interval = match timestamp_interval {
895 Some(duration) => {
896 let min = scx.catalog.system_vars().min_timestamp_interval();
897 let max = scx.catalog.system_vars().max_timestamp_interval();
898 if duration < min || duration > max {
899 return Err(PlanError::InvalidTimestampInterval {
900 min,
901 max,
902 requested: duration,
903 });
904 }
905 duration
906 }
907 None => scx.catalog.config().timestamp_interval,
908 };
909
910 let (desc, data_source) = match progress_subsource {
911 Some(name) => {
912 let DeferredItemName::Named(name) = name else {
913 sql_bail!("[internal error] progress subsource must be named during purification");
914 };
915 let ResolvedItemName::Item { id, .. } = name else {
916 sql_bail!("[internal error] invalid target id");
917 };
918
919 let details = match external_connection {
920 GenericSourceConnection::Kafka(ref c) => {
921 SourceExportDetails::Kafka(KafkaSourceExportDetails {
922 metadata_columns: c.metadata_columns.clone(),
923 })
924 }
925 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926 LoadGenerator::Auction
927 | LoadGenerator::Marketing
928 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929 LoadGenerator::Counter { .. }
930 | LoadGenerator::Clock
931 | LoadGenerator::Datums
932 | LoadGenerator::KeyValue(_) => {
933 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934 output: LoadGeneratorOutput::Default,
935 })
936 }
937 },
938 GenericSourceConnection::Postgres(_)
939 | GenericSourceConnection::MySql(_)
940 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941 };
942
943 let data_source = DataSourceDesc::OldSyntaxIngestion {
944 desc: SourceDesc {
945 connection: external_connection,
946 timestamp_interval,
947 },
948 progress_subsource: *id,
949 data_config: SourceExportDataConfig {
950 encoding,
951 envelope: envelope.clone(),
952 },
953 details,
954 };
955 (desc, data_source)
956 }
957 None => {
958 let desc = external_connection.timestamp_desc();
959 let data_source = DataSourceDesc::Ingestion(SourceDesc {
960 connection: external_connection,
961 timestamp_interval,
962 });
963 (desc, data_source)
964 }
965 };
966
967 let if_not_exists = *if_not_exists;
968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970 let full_name = scx.catalog.resolve_full_name(&name);
972 let partial_name = PartialItemName::from(full_name.clone());
973 if let (false, Ok(item)) = (
976 if_not_exists,
977 scx.catalog.resolve_item_or_type(&partial_name),
978 ) {
979 return Err(PlanError::ItemAlreadyExists {
980 name: full_name.to_string(),
981 item_type: item.item_type(),
982 });
983 }
984
985 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992 let timeline = match envelope {
994 SourceEnvelope::CdcV2 => {
995 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996 }
997 _ => Timeline::EpochMilliseconds,
998 };
999
1000 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002 let source = Source {
1003 create_sql,
1004 data_source,
1005 desc,
1006 compaction_window,
1007 };
1008
1009 Ok(Plan::CreateSource(CreateSourcePlan {
1010 name,
1011 source,
1012 if_not_exists,
1013 timeline,
1014 in_cluster: Some(in_cluster.id()),
1015 }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019 scx: &StatementContext<'_>,
1020 source_connection: &CreateSourceConnection<Aug>,
1021 include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023 Ok(match source_connection {
1024 CreateSourceConnection::Kafka {
1025 connection,
1026 options,
1027 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028 scx,
1029 connection,
1030 options,
1031 include_metadata,
1032 )?),
1033 CreateSourceConnection::Postgres {
1034 connection,
1035 options,
1036 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037 scx, connection, options,
1038 )?),
1039 CreateSourceConnection::SqlServer {
1040 connection,
1041 options,
1042 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043 scx, connection, options,
1044 )?),
1045 CreateSourceConnection::MySql {
1046 connection,
1047 options,
1048 } => {
1049 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050 }
1051 CreateSourceConnection::LoadGenerator { generator, options } => {
1052 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053 scx,
1054 generator,
1055 options,
1056 include_metadata,
1057 )?)
1058 }
1059 })
1060}
1061
1062fn plan_load_generator_source_connection(
1063 scx: &StatementContext<'_>,
1064 generator: &ast::LoadGenerator,
1065 options: &Vec<LoadGeneratorOption<Aug>>,
1066 include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068 let load_generator =
1069 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070 let LoadGeneratorOptionExtracted {
1071 tick_interval,
1072 as_of,
1073 up_to,
1074 ..
1075 } = options.clone().try_into()?;
1076 let tick_micros = match tick_interval {
1077 Some(interval) => Some(interval.as_micros().try_into()?),
1078 None => None,
1079 };
1080 if up_to < as_of {
1081 sql_bail!("UP TO cannot be less than AS OF");
1082 }
1083 Ok(LoadGeneratorSourceConnection {
1084 load_generator,
1085 tick_micros,
1086 as_of,
1087 up_to,
1088 })
1089}
1090
1091fn plan_mysql_source_connection(
1092 scx: &StatementContext<'_>,
1093 connection: &ResolvedItemName,
1094 options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096 let connection_item = scx.get_item_by_resolved_name(connection)?;
1097 match connection_item.connection()? {
1098 Connection::MySql(connection) => connection,
1099 _ => sql_bail!(
1100 "{} is not a MySQL connection",
1101 scx.catalog.resolve_full_name(connection_item.name())
1102 ),
1103 };
1104 let MySqlConfigOptionExtracted {
1105 details,
1106 text_columns: _,
1110 exclude_columns: _,
1111 seen: _,
1112 } = options.clone().try_into()?;
1113 let details = details
1114 .as_ref()
1115 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119 Ok(MySqlSourceConnection {
1120 connection: connection_item.id(),
1121 connection_id: connection_item.id(),
1122 details,
1123 })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127 scx: &StatementContext<'_>,
1128 connection: &ResolvedItemName,
1129 options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131 let connection_item = scx.get_item_by_resolved_name(connection)?;
1132 match connection_item.connection()? {
1133 Connection::SqlServer(connection) => connection,
1134 _ => sql_bail!(
1135 "{} is not a SQL Server connection",
1136 scx.catalog.resolve_full_name(connection_item.name())
1137 ),
1138 };
1139 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140 let details = details
1141 .as_ref()
1142 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143 let extras = hex::decode(details)
1144 .map_err(|e| sql_err!("{e}"))
1145 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147 Ok(SqlServerSourceConnection {
1148 connection_id: connection_item.id(),
1149 connection: connection_item.id(),
1150 extras,
1151 })
1152}
1153
1154fn plan_postgres_source_connection(
1155 scx: &StatementContext<'_>,
1156 connection: &ResolvedItemName,
1157 options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159 let connection_item = scx.get_item_by_resolved_name(connection)?;
1160 let PgConfigOptionExtracted {
1161 details,
1162 publication,
1163 text_columns: _,
1167 exclude_columns: _,
1171 seen: _,
1172 } = options.clone().try_into()?;
1173 let details = details
1174 .as_ref()
1175 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177 let details =
1178 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179 let publication_details =
1180 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181 Ok(PostgresSourceConnection {
1182 connection: connection_item.id(),
1183 connection_id: connection_item.id(),
1184 publication: publication.expect("validated exists during purification"),
1185 publication_details,
1186 })
1187}
1188
1189fn plan_kafka_source_connection(
1190 scx: &StatementContext<'_>,
1191 connection_name: &ResolvedItemName,
1192 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193 include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197 sql_bail!(
1198 "{} is not a kafka connection",
1199 scx.catalog.resolve_full_name(connection_item.name())
1200 )
1201 }
1202 let KafkaSourceConfigOptionExtracted {
1203 group_id_prefix,
1204 topic,
1205 topic_metadata_refresh_interval,
1206 start_timestamp: _, start_offset,
1208 seen: _,
1209 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210 let topic = topic.expect("validated exists during purification");
1211 let mut start_offsets = BTreeMap::new();
1212 if let Some(offsets) = start_offset {
1213 for (part, offset) in offsets.iter().enumerate() {
1214 if *offset < 0 {
1215 sql_bail!("START OFFSET must be a nonnegative integer");
1216 }
1217 start_offsets.insert(i32::try_from(part)?, *offset);
1218 }
1219 }
1220 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224 }
1225 let metadata_columns = include_metadata
1226 .into_iter()
1227 .flat_map(|item| match item {
1228 SourceIncludeMetadata::Timestamp { alias } => {
1229 let name = match alias {
1230 Some(name) => name.to_string(),
1231 None => "timestamp".to_owned(),
1232 };
1233 Some((name, KafkaMetadataKind::Timestamp))
1234 }
1235 SourceIncludeMetadata::Partition { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "partition".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Partition))
1241 }
1242 SourceIncludeMetadata::Offset { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "offset".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Offset))
1248 }
1249 SourceIncludeMetadata::Headers { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "headers".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Headers))
1255 }
1256 SourceIncludeMetadata::Header {
1257 alias,
1258 key,
1259 use_bytes,
1260 } => Some((
1261 alias.to_string(),
1262 KafkaMetadataKind::Header {
1263 key: key.clone(),
1264 use_bytes: *use_bytes,
1265 },
1266 )),
1267 SourceIncludeMetadata::Key { .. } => {
1268 None
1270 }
1271 })
1272 .collect();
1273 Ok(KafkaSourceConnection {
1274 connection: connection_item.id(),
1275 connection_id: connection_item.id(),
1276 topic,
1277 start_offsets,
1278 group_id_prefix,
1279 topic_metadata_refresh_interval,
1280 metadata_columns,
1281 })
1282}
1283
1284fn apply_source_envelope_encoding(
1285 scx: &StatementContext,
1286 envelope: &ast::SourceEnvelope,
1287 format: &Option<FormatSpecifier<Aug>>,
1288 key_desc: Option<RelationDesc>,
1289 value_desc: RelationDesc,
1290 include_metadata: &[SourceIncludeMetadata],
1291 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292 source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294 (
1295 RelationDesc,
1296 SourceEnvelope,
1297 Option<SourceDataEncoding<ReferencedConnection>>,
1298 ),
1299 PlanError,
1300> {
1301 let encoding = match format {
1302 Some(format) => Some(get_encoding(scx, format, envelope)?),
1303 None => None,
1304 };
1305
1306 let (key_desc, value_desc) = match &encoding {
1307 Some(encoding) => {
1308 match value_desc.typ().columns() {
1311 [typ] => match typ.scalar_type {
1312 SqlScalarType::Bytes => {}
1313 _ => sql_bail!(
1314 "The schema produced by the source is incompatible with format decoding"
1315 ),
1316 },
1317 _ => sql_bail!(
1318 "The schema produced by the source is incompatible with format decoding"
1319 ),
1320 }
1321
1322 let (key_desc, value_desc) = encoding.desc()?;
1323
1324 let key_desc = key_desc.map(|desc| {
1331 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333 if is_kafka && is_envelope_none {
1334 RelationDesc::from_names_and_types(
1335 desc.into_iter()
1336 .map(|(name, typ)| (name, typ.nullable(true))),
1337 )
1338 } else {
1339 desc
1340 }
1341 });
1342 (key_desc, value_desc)
1343 }
1344 None => (key_desc, value_desc),
1345 };
1346
1347 let key_envelope_no_encoding = matches!(
1360 source_connection,
1361 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362 load_generator: LoadGenerator::KeyValue(_),
1363 ..
1364 })
1365 );
1366 let mut key_envelope = get_key_envelope(
1367 include_metadata,
1368 encoding.as_ref(),
1369 key_envelope_no_encoding,
1370 )?;
1371
1372 match (&envelope, &key_envelope) {
1373 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376 ),
1377 _ => {}
1378 };
1379
1380 let envelope = match &envelope {
1389 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391 ast::SourceEnvelope::Debezium => {
1392 let after_idx = match typecheck_debezium(&value_desc) {
1394 Ok((_before_idx, after_idx)) => Ok(after_idx),
1395 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396 Some(DataEncoding::Avro(_)) => Err(type_err),
1397 _ => Err(sql_err!(
1398 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399 )),
1400 },
1401 }?;
1402
1403 UnplannedSourceEnvelope::Upsert {
1404 style: UpsertStyle::Debezium { after_idx },
1405 }
1406 }
1407 ast::SourceEnvelope::Upsert {
1408 value_decode_err_policy,
1409 } => {
1410 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411 None => {
1412 if !key_envelope_no_encoding {
1413 bail_unsupported!(format!(
1414 "UPSERT requires a key/value format: {:?}",
1415 format
1416 ))
1417 }
1418 None
1419 }
1420 Some(key_encoding) => Some(key_encoding),
1421 };
1422 if key_envelope == KeyEnvelope::None {
1425 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426 }
1427 let style = match value_decode_err_policy.as_slice() {
1429 [] => UpsertStyle::Default(key_envelope),
1430 [SourceErrorPolicy::Inline { alias }] => {
1431 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432 UpsertStyle::ValueErrInline {
1433 key_envelope,
1434 error_column: alias
1435 .as_ref()
1436 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437 }
1438 }
1439 _ => {
1440 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441 }
1442 };
1443
1444 UnplannedSourceEnvelope::Upsert { style }
1445 }
1446 ast::SourceEnvelope::CdcV2 => {
1447 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448 match format {
1450 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452 }
1453 UnplannedSourceEnvelope::CdcV2
1454 }
1455 };
1456
1457 let metadata_desc = included_column_desc(metadata_columns_desc);
1458 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460 Ok((desc, envelope, encoding))
1461}
1462
1463fn plan_source_export_desc(
1466 scx: &StatementContext,
1467 name: &UnresolvedItemName,
1468 columns: &Vec<ColumnDef<Aug>>,
1469 constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471 let names: Vec<_> = columns
1472 .iter()
1473 .map(|c| normalize::column_name(c.name.clone()))
1474 .collect();
1475
1476 if let Some(dup) = names.iter().duplicates().next() {
1477 sql_bail!("column {} specified more than once", dup.quoted());
1478 }
1479
1480 let mut column_types = Vec::with_capacity(columns.len());
1483 let mut keys = Vec::new();
1484
1485 for (i, c) in columns.into_iter().enumerate() {
1486 let aug_data_type = &c.data_type;
1487 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488 let mut nullable = true;
1489 for option in &c.options {
1490 match &option.option {
1491 ColumnOption::NotNull => nullable = false,
1492 ColumnOption::Default(_) => {
1493 bail_unsupported!("Source export with default value")
1494 }
1495 ColumnOption::Unique { is_primary } => {
1496 keys.push(vec![i]);
1497 if *is_primary {
1498 nullable = false;
1499 }
1500 }
1501 other => {
1502 bail_unsupported!(format!("Source export with column constraint: {}", other))
1503 }
1504 }
1505 }
1506 column_types.push(ty.nullable(nullable));
1507 }
1508
1509 let mut seen_primary = false;
1510 'c: for constraint in constraints {
1511 match constraint {
1512 TableConstraint::Unique {
1513 name: _,
1514 columns,
1515 is_primary,
1516 nulls_not_distinct,
1517 } => {
1518 if seen_primary && *is_primary {
1519 sql_bail!(
1520 "multiple primary keys for source export {} are not allowed",
1521 name.to_ast_string_stable()
1522 );
1523 }
1524 seen_primary = *is_primary || seen_primary;
1525
1526 let mut key = vec![];
1527 for column in columns {
1528 let column = normalize::column_name(column.clone());
1529 match names.iter().position(|name| *name == column) {
1530 None => sql_bail!("unknown column in constraint: {}", column),
1531 Some(i) => {
1532 let nullable = &mut column_types[i].nullable;
1533 if *is_primary {
1534 if *nulls_not_distinct {
1535 sql_bail!(
1536 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537 );
1538 }
1539 *nullable = false;
1540 } else if !(*nulls_not_distinct || !*nullable) {
1541 break 'c;
1544 }
1545
1546 key.push(i);
1547 }
1548 }
1549 }
1550
1551 if *is_primary {
1552 keys.insert(0, key);
1553 } else {
1554 keys.push(key);
1555 }
1556 }
1557 TableConstraint::ForeignKey { .. } => {
1558 bail_unsupported!("Source export with a foreign key")
1559 }
1560 TableConstraint::Check { .. } => {
1561 bail_unsupported!("Source export with a check constraint")
1562 }
1563 }
1564 }
1565
1566 let typ = SqlRelationType::new(column_types).with_keys(keys);
1567 let desc = RelationDesc::new(typ, names);
1568 Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572 CreateSubsourceOption,
1573 (Progress, bool, Default(false)),
1574 (ExternalReference, UnresolvedItemName),
1575 (RetainHistory, OptionalDuration),
1576 (TextColumns, Vec::<Ident>, Default(vec![])),
1577 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578 (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582 scx: &StatementContext,
1583 stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585 let CreateSubsourceStatement {
1586 name,
1587 columns,
1588 of_source,
1589 constraints,
1590 if_not_exists,
1591 with_options,
1592 } = &stmt;
1593
1594 let CreateSubsourceOptionExtracted {
1595 progress,
1596 retain_history,
1597 external_reference,
1598 text_columns,
1599 exclude_columns,
1600 details,
1601 seen: _,
1602 } = with_options.clone().try_into()?;
1603
1604 assert!(
1609 progress ^ (external_reference.is_some() && of_source.is_some()),
1610 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611 );
1612
1613 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615 let data_source = if let Some(source_reference) = of_source {
1616 if scx.catalog.system_vars().enable_create_table_from_source()
1619 && scx.catalog.system_vars().force_source_table_syntax()
1620 {
1621 Err(PlanError::UseTablesForSources(
1622 "CREATE SUBSOURCE".to_string(),
1623 ))?;
1624 }
1625
1626 let ingestion_id = *source_reference.item_id();
1629 let external_reference = external_reference.unwrap();
1630
1631 let details = details
1634 .as_ref()
1635 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637 let details =
1638 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639 let details =
1640 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641 let details = match details {
1642 SourceExportStatementDetails::Postgres { table } => {
1643 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644 column_casts: crate::pure::postgres::generate_column_casts(
1645 scx,
1646 &table,
1647 &text_columns,
1648 )?,
1649 table,
1650 })
1651 }
1652 SourceExportStatementDetails::MySql {
1653 table,
1654 initial_gtid_set,
1655 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656 table,
1657 initial_gtid_set,
1658 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659 exclude_columns: exclude_columns
1660 .into_iter()
1661 .map(|c| c.into_string())
1662 .collect(),
1663 }),
1664 SourceExportStatementDetails::SqlServer {
1665 table,
1666 capture_instance,
1667 initial_lsn,
1668 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669 capture_instance,
1670 table,
1671 initial_lsn,
1672 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673 exclude_columns: exclude_columns
1674 .into_iter()
1675 .map(|c| c.into_string())
1676 .collect(),
1677 }),
1678 SourceExportStatementDetails::LoadGenerator { output } => {
1679 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680 }
1681 SourceExportStatementDetails::Kafka {} => {
1682 bail_unsupported!("subsources cannot reference Kafka sources")
1683 }
1684 };
1685 DataSourceDesc::IngestionExport {
1686 ingestion_id,
1687 external_reference,
1688 details,
1689 data_config: SourceExportDataConfig {
1691 envelope: SourceEnvelope::None(NoneEnvelope {
1692 key_envelope: KeyEnvelope::None,
1693 key_arity: 0,
1694 }),
1695 encoding: None,
1696 },
1697 }
1698 } else if progress {
1699 DataSourceDesc::Progress
1700 } else {
1701 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702 };
1703
1704 let if_not_exists = *if_not_exists;
1705 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710 let source = Source {
1711 create_sql,
1712 data_source,
1713 desc,
1714 compaction_window,
1715 };
1716
1717 Ok(Plan::CreateSource(CreateSourcePlan {
1718 name,
1719 source,
1720 if_not_exists,
1721 timeline: Timeline::EpochMilliseconds,
1722 in_cluster: None,
1723 }))
1724}
1725
1726generate_extracted_config!(
1727 TableFromSourceOption,
1728 (TextColumns, Vec::<Ident>, Default(vec![])),
1729 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730 (PartitionBy, Vec<Ident>),
1731 (RetainHistory, OptionalDuration),
1732 (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736 scx: &StatementContext,
1737 stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739 if !scx.catalog.system_vars().enable_create_table_from_source() {
1740 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741 }
1742
1743 let CreateTableFromSourceStatement {
1744 name,
1745 columns,
1746 constraints,
1747 if_not_exists,
1748 source,
1749 external_reference,
1750 envelope,
1751 format,
1752 include_metadata,
1753 with_options,
1754 } = &stmt;
1755
1756 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758 let TableFromSourceOptionExtracted {
1759 text_columns,
1760 exclude_columns,
1761 retain_history,
1762 partition_by,
1763 details,
1764 seen: _,
1765 } = with_options.clone().try_into()?;
1766
1767 let source_item = scx.get_item_by_resolved_name(source)?;
1768 let ingestion_id = source_item.id();
1769
1770 let details = details
1773 .as_ref()
1774 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776 let details =
1777 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778 let details =
1779 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782 && include_metadata
1783 .iter()
1784 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785 {
1786 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788 }
1789 if !matches!(
1790 details,
1791 SourceExportStatementDetails::Kafka { .. }
1792 | SourceExportStatementDetails::LoadGenerator { .. }
1793 ) && !include_metadata.is_empty()
1794 {
1795 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796 }
1797
1798 let details = match details {
1799 SourceExportStatementDetails::Postgres { table } => {
1800 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801 column_casts: crate::pure::postgres::generate_column_casts(
1802 scx,
1803 &table,
1804 &text_columns,
1805 )?,
1806 table,
1807 })
1808 }
1809 SourceExportStatementDetails::MySql {
1810 table,
1811 initial_gtid_set,
1812 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813 table,
1814 initial_gtid_set,
1815 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816 exclude_columns: exclude_columns
1817 .into_iter()
1818 .map(|c| c.into_string())
1819 .collect(),
1820 }),
1821 SourceExportStatementDetails::SqlServer {
1822 table,
1823 capture_instance,
1824 initial_lsn,
1825 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826 table,
1827 capture_instance,
1828 initial_lsn,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 }),
1835 SourceExportStatementDetails::LoadGenerator { output } => {
1836 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837 }
1838 SourceExportStatementDetails::Kafka {} => {
1839 if !include_metadata.is_empty()
1840 && !matches!(
1841 envelope,
1842 ast::SourceEnvelope::Upsert { .. }
1843 | ast::SourceEnvelope::None
1844 | ast::SourceEnvelope::Debezium
1845 )
1846 {
1847 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849 }
1850
1851 let metadata_columns = include_metadata
1852 .into_iter()
1853 .flat_map(|item| match item {
1854 SourceIncludeMetadata::Timestamp { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "timestamp".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Timestamp))
1860 }
1861 SourceIncludeMetadata::Partition { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "partition".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Partition))
1867 }
1868 SourceIncludeMetadata::Offset { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "offset".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Offset))
1874 }
1875 SourceIncludeMetadata::Headers { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "headers".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Headers))
1881 }
1882 SourceIncludeMetadata::Header {
1883 alias,
1884 key,
1885 use_bytes,
1886 } => Some((
1887 alias.to_string(),
1888 KafkaMetadataKind::Header {
1889 key: key.clone(),
1890 use_bytes: *use_bytes,
1891 },
1892 )),
1893 SourceIncludeMetadata::Key { .. } => {
1894 None
1896 }
1897 })
1898 .collect();
1899
1900 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901 }
1902 };
1903
1904 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906 let (key_desc, value_desc) =
1911 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912 let columns = match columns {
1913 TableFromSourceColumns::Defined(columns) => columns,
1914 _ => unreachable!(),
1915 };
1916 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917 (None, desc)
1918 } else {
1919 let key_desc = source_connection.default_key_desc();
1920 let value_desc = source_connection.default_value_desc();
1921 (Some(key_desc), value_desc)
1922 };
1923
1924 let metadata_columns_desc = match &details {
1925 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926 metadata_columns, ..
1927 }) => kafka_metadata_columns_desc(metadata_columns),
1928 _ => vec![],
1929 };
1930
1931 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932 scx,
1933 &envelope,
1934 format,
1935 key_desc,
1936 value_desc,
1937 include_metadata,
1938 metadata_columns_desc,
1939 source_connection,
1940 )?;
1941 if let TableFromSourceColumns::Named(col_names) = columns {
1942 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943 }
1944
1945 let names: Vec<_> = desc.iter_names().cloned().collect();
1946 if let Some(dup) = names.iter().duplicates().next() {
1947 sql_bail!("column {} specified more than once", dup.quoted());
1948 }
1949
1950 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952 let timeline = match envelope {
1955 SourceEnvelope::CdcV2 => {
1956 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957 }
1958 _ => Timeline::EpochMilliseconds,
1959 };
1960
1961 if let Some(partition_by) = partition_by {
1962 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963 check_partition_by(&desc, partition_by)?;
1964 }
1965
1966 let data_source = DataSourceDesc::IngestionExport {
1967 ingestion_id,
1968 external_reference: external_reference
1969 .as_ref()
1970 .expect("populated in purification")
1971 .clone(),
1972 details,
1973 data_config: SourceExportDataConfig { envelope, encoding },
1974 };
1975
1976 let if_not_exists = *if_not_exists;
1977
1978 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981 let table = Table {
1982 create_sql,
1983 desc: VersionedRelationDesc::new(desc),
1984 temporary: false,
1985 compaction_window,
1986 data_source: TableDataSource::DataSource {
1987 desc: data_source,
1988 timeline,
1989 },
1990 };
1991
1992 Ok(Plan::CreateTable(CreateTablePlan {
1993 name,
1994 table,
1995 if_not_exists,
1996 }))
1997}
1998
1999generate_extracted_config!(
2000 LoadGeneratorOption,
2001 (TickInterval, Duration),
2002 (AsOf, u64, Default(0_u64)),
2003 (UpTo, u64, Default(u64::MAX)),
2004 (ScaleFactor, f64),
2005 (MaxCardinality, u64),
2006 (Keys, u64),
2007 (SnapshotRounds, u64),
2008 (TransactionalSnapshot, bool),
2009 (ValueSize, u64),
2010 (Seed, u64),
2011 (Partitions, u64),
2012 (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016 pub(super) fn ensure_only_valid_options(
2017 &self,
2018 loadgen: &ast::LoadGenerator,
2019 ) -> Result<(), PlanError> {
2020 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022 let mut options = self.seen.clone();
2023
2024 let permitted_options: &[_] = match loadgen {
2025 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031 ast::LoadGenerator::KeyValue => &[
2032 TickInterval,
2033 Keys,
2034 SnapshotRounds,
2035 TransactionalSnapshot,
2036 ValueSize,
2037 Seed,
2038 Partitions,
2039 BatchSize,
2040 ],
2041 };
2042
2043 for o in permitted_options {
2044 options.remove(o);
2045 }
2046
2047 if !options.is_empty() {
2048 sql_bail!(
2049 "{} load generators do not support {} values",
2050 loadgen,
2051 options.iter().join(", ")
2052 )
2053 }
2054
2055 Ok(())
2056 }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060 scx: &StatementContext,
2061 loadgen: &ast::LoadGenerator,
2062 options: &[LoadGeneratorOption<Aug>],
2063 include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066 extracted.ensure_only_valid_options(loadgen)?;
2067
2068 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070 }
2071
2072 let load_generator = match loadgen {
2073 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074 ast::LoadGenerator::Clock => {
2075 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076 LoadGenerator::Clock
2077 }
2078 ast::LoadGenerator::Counter => {
2079 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080 let LoadGeneratorOptionExtracted {
2081 max_cardinality, ..
2082 } = extracted;
2083 LoadGenerator::Counter { max_cardinality }
2084 }
2085 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086 ast::LoadGenerator::Datums => {
2087 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088 LoadGenerator::Datums
2089 }
2090 ast::LoadGenerator::Tpch => {
2091 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093 let sf: f64 = scale_factor.unwrap_or(0.01);
2095 if !sf.is_finite() || sf < 0.0 {
2096 sql_bail!("unsupported scale factor {sf}");
2097 }
2098
2099 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100 let total = (sf * multiplier).floor();
2101 let mut i = i64::try_cast_from(total)
2102 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103 if i < 1 {
2104 i = 1;
2105 }
2106 Ok(i)
2107 };
2108
2109 let count_supplier = f_to_i(10_000f64)?;
2112 let count_part = f_to_i(200_000f64)?;
2113 let count_customer = f_to_i(150_000f64)?;
2114 let count_orders = f_to_i(150_000f64 * 10f64)?;
2115 let count_clerk = f_to_i(1_000f64)?;
2116
2117 LoadGenerator::Tpch {
2118 count_supplier,
2119 count_part,
2120 count_customer,
2121 count_orders,
2122 count_clerk,
2123 }
2124 }
2125 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127 let LoadGeneratorOptionExtracted {
2128 keys,
2129 snapshot_rounds,
2130 transactional_snapshot,
2131 value_size,
2132 tick_interval,
2133 seed,
2134 partitions,
2135 batch_size,
2136 ..
2137 } = extracted;
2138
2139 let mut include_offset = None;
2140 for im in include_metadata {
2141 match im {
2142 SourceIncludeMetadata::Offset { alias } => {
2143 include_offset = match alias {
2144 Some(alias) => Some(alias.to_string()),
2145 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146 }
2147 }
2148 SourceIncludeMetadata::Key { .. } => continue,
2149
2150 _ => {
2151 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152 }
2153 };
2154 }
2155
2156 let lgkv = KeyValueLoadGenerator {
2157 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158 snapshot_rounds: snapshot_rounds
2159 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162 value_size: value_size
2163 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164 partitions: partitions
2165 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166 tick_interval,
2167 batch_size: batch_size
2168 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170 include_offset,
2171 };
2172
2173 if lgkv.keys == 0
2174 || lgkv.partitions == 0
2175 || lgkv.value_size == 0
2176 || lgkv.batch_size == 0
2177 {
2178 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179 }
2180
2181 if lgkv.keys % lgkv.partitions != 0 {
2182 sql_bail!("KEYS must be a multiple of PARTITIONS")
2183 }
2184
2185 if lgkv.batch_size > lgkv.keys {
2186 sql_bail!("KEYS must be larger than BATCH SIZE")
2187 }
2188
2189 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193 }
2194
2195 if lgkv.snapshot_rounds == 0 {
2196 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197 }
2198
2199 LoadGenerator::KeyValue(lgkv)
2200 }
2201 };
2202
2203 Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207 let before = value_desc.get_by_name(&"before".into());
2208 let (after_idx, after_ty) = value_desc
2209 .get_by_name(&"after".into())
2210 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211 let before_idx = if let Some((before_idx, before_ty)) = before {
2212 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213 sql_bail!("'before' column must be of type record");
2214 }
2215 if before_ty != after_ty {
2216 sql_bail!("'before' type differs from 'after' column");
2217 }
2218 Some(before_idx)
2219 } else {
2220 None
2221 };
2222 Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226 scx: &StatementContext,
2227 format: &FormatSpecifier<Aug>,
2228 envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230 let encoding = match format {
2231 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232 FormatSpecifier::KeyValue { key, value } => {
2233 let key = {
2234 let encoding = get_encoding_inner(scx, key)?;
2235 Some(encoding.key.unwrap_or(encoding.value))
2236 };
2237 let value = get_encoding_inner(scx, value)?.value;
2238 SourceDataEncoding { key, value }
2239 }
2240 };
2241
2242 let requires_keyvalue = matches!(
2243 envelope,
2244 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245 );
2246 let is_keyvalue = encoding.key.is_some();
2247 if requires_keyvalue && !is_keyvalue {
2248 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249 };
2250
2251 Ok(encoding)
2252}
2253
2254fn source_sink_cluster_config<'a, 'ctx>(
2260 scx: &'a StatementContext<'ctx>,
2261 in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263 let cluster = match in_cluster {
2264 None => {
2265 let cluster = scx.catalog.resolve_cluster(None)?;
2266 *in_cluster = Some(ResolvedClusterName {
2267 id: cluster.id(),
2268 print_name: None,
2269 });
2270 cluster
2271 }
2272 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273 };
2274
2275 Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282 pub key_schema: Option<String>,
2283 pub value_schema: String,
2284 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285 pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289 scx: &StatementContext,
2290 format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292 let value = match format {
2293 Format::Bytes => DataEncoding::Bytes,
2294 Format::Avro(schema) => {
2295 let Schema {
2296 key_schema,
2297 value_schema,
2298 csr_connection,
2299 confluent_wire_format,
2300 } = match schema {
2301 AvroSchema::InlineSchema {
2304 schema: ast::Schema { schema },
2305 with_options,
2306 } => {
2307 let AvroSchemaOptionExtracted {
2308 confluent_wire_format,
2309 ..
2310 } = with_options.clone().try_into()?;
2311
2312 Schema {
2313 key_schema: None,
2314 value_schema: schema.clone(),
2315 csr_connection: None,
2316 confluent_wire_format,
2317 }
2318 }
2319 AvroSchema::Csr {
2320 csr_connection:
2321 CsrConnectionAvro {
2322 connection,
2323 seed,
2324 key_strategy: _,
2325 value_strategy: _,
2326 },
2327 } => {
2328 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329 let csr_connection = match item.connection()? {
2330 Connection::Csr(_) => item.id(),
2331 _ => {
2332 sql_bail!(
2333 "{} is not a schema registry connection",
2334 scx.catalog
2335 .resolve_full_name(item.name())
2336 .to_string()
2337 .quoted()
2338 )
2339 }
2340 };
2341
2342 if let Some(seed) = seed {
2343 Schema {
2344 key_schema: seed.key_schema.clone(),
2345 value_schema: seed.value_schema.clone(),
2346 csr_connection: Some(csr_connection),
2347 confluent_wire_format: true,
2348 }
2349 } else {
2350 unreachable!("CSR seed resolution should already have been called: Avro")
2351 }
2352 }
2353 };
2354
2355 if let Some(key_schema) = key_schema {
2356 return Ok(SourceDataEncoding {
2357 key: Some(DataEncoding::Avro(AvroEncoding {
2358 schema: key_schema,
2359 csr_connection: csr_connection.clone(),
2360 confluent_wire_format,
2361 })),
2362 value: DataEncoding::Avro(AvroEncoding {
2363 schema: value_schema,
2364 csr_connection,
2365 confluent_wire_format,
2366 }),
2367 });
2368 } else {
2369 DataEncoding::Avro(AvroEncoding {
2370 schema: value_schema,
2371 csr_connection,
2372 confluent_wire_format,
2373 })
2374 }
2375 }
2376 Format::Protobuf(schema) => match schema {
2377 ProtobufSchema::Csr {
2378 csr_connection:
2379 CsrConnectionProtobuf {
2380 connection:
2381 CsrConnection {
2382 connection,
2383 options,
2384 },
2385 seed,
2386 },
2387 } => {
2388 if let Some(CsrSeedProtobuf { key, value }) = seed {
2389 let item = scx.get_item_by_resolved_name(connection)?;
2390 let _ = match item.connection()? {
2391 Connection::Csr(connection) => connection,
2392 _ => {
2393 sql_bail!(
2394 "{} is not a schema registry connection",
2395 scx.catalog
2396 .resolve_full_name(item.name())
2397 .to_string()
2398 .quoted()
2399 )
2400 }
2401 };
2402
2403 if !options.is_empty() {
2404 sql_bail!("Protobuf CSR connections do not support any options");
2405 }
2406
2407 let value = DataEncoding::Protobuf(ProtobufEncoding {
2408 descriptors: strconv::parse_bytes(&value.schema)?,
2409 message_name: value.message_name.clone(),
2410 confluent_wire_format: true,
2411 });
2412 if let Some(key) = key {
2413 return Ok(SourceDataEncoding {
2414 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415 descriptors: strconv::parse_bytes(&key.schema)?,
2416 message_name: key.message_name.clone(),
2417 confluent_wire_format: true,
2418 })),
2419 value,
2420 });
2421 }
2422 value
2423 } else {
2424 unreachable!("CSR seed resolution should already have been called: Proto")
2425 }
2426 }
2427 ProtobufSchema::InlineSchema {
2428 message_name,
2429 schema: ast::Schema { schema },
2430 } => {
2431 let descriptors = strconv::parse_bytes(schema)?;
2432
2433 DataEncoding::Protobuf(ProtobufEncoding {
2434 descriptors,
2435 message_name: message_name.to_owned(),
2436 confluent_wire_format: false,
2437 })
2438 }
2439 },
2440 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441 regex: mz_repr::adt::regex::Regex::new(regex, false)
2442 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443 }),
2444 Format::Csv { columns, delimiter } => {
2445 let columns = match columns {
2446 CsvColumns::Header { names } => {
2447 if names.is_empty() {
2448 sql_bail!("[internal error] column spec should get names in purify")
2449 }
2450 ColumnSpec::Header {
2451 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452 }
2453 }
2454 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455 };
2456 DataEncoding::Csv(CsvEncoding {
2457 columns,
2458 delimiter: u8::try_from(*delimiter)
2459 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460 })
2461 }
2462 Format::Json { array: false } => DataEncoding::Json,
2463 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464 Format::Text => DataEncoding::Text,
2465 };
2466 Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469fn get_key_envelope(
2471 included_items: &[SourceIncludeMetadata],
2472 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473 key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475 let key_definition = included_items
2476 .iter()
2477 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482 (Some(name), _) if key_envelope_no_encoding => {
2483 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484 }
2485 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486 (_, None) => {
2487 sql_bail!(
2491 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492 got bare FORMAT"
2493 );
2494 }
2495 }
2496 } else {
2497 Ok(KeyEnvelope::None)
2498 }
2499}
2500
2501fn get_unnamed_key_envelope(
2504 key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506 let is_composite = match key {
2510 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511 Some(
2512 DataEncoding::Avro(_)
2513 | DataEncoding::Csv(_)
2514 | DataEncoding::Protobuf(_)
2515 | DataEncoding::Regex { .. },
2516 ) => true,
2517 None => false,
2518 };
2519
2520 if is_composite {
2521 Ok(KeyEnvelope::Flattened)
2522 } else {
2523 Ok(KeyEnvelope::Named("key".to_string()))
2524 }
2525}
2526
2527pub fn describe_create_view(
2528 _: &StatementContext,
2529 _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531 Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535 scx: &StatementContext,
2536 def: &mut ViewDefinition<Aug>,
2537 temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539 let create_sql = normalize::create_statement(
2540 scx,
2541 Statement::CreateView(CreateViewStatement {
2542 if_exists: IfExistsBehavior::Error,
2543 temporary,
2544 definition: def.clone(),
2545 }),
2546 )?;
2547
2548 let ViewDefinition {
2549 name,
2550 columns,
2551 query,
2552 } = def;
2553
2554 let query::PlannedRootQuery {
2555 expr,
2556 mut desc,
2557 finishing,
2558 scope: _,
2559 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566 &finishing,
2567 expr.arity()
2568 ));
2569 if expr.contains_parameters()? {
2570 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571 }
2572
2573 let dependencies = expr
2574 .depends_on()
2575 .into_iter()
2576 .map(|gid| scx.catalog.resolve_item_id(&gid))
2577 .collect();
2578
2579 let name = if temporary {
2580 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581 } else {
2582 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583 };
2584
2585 plan_utils::maybe_rename_columns(
2586 format!("view {}", scx.catalog.resolve_full_name(&name)),
2587 &mut desc,
2588 columns,
2589 )?;
2590 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592 if let Some(dup) = names.iter().duplicates().next() {
2593 sql_bail!("column {} specified more than once", dup.quoted());
2594 }
2595
2596 let view = View {
2597 create_sql,
2598 expr,
2599 dependencies,
2600 column_names: names,
2601 temporary,
2602 };
2603
2604 Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608 scx: &StatementContext,
2609 mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611 let CreateViewStatement {
2612 temporary,
2613 if_exists,
2614 definition,
2615 } = &mut stmt;
2616 let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623 let if_exists = true;
2624 let cascade = false;
2625 let maybe_item_to_drop = plan_drop_item(
2626 scx,
2627 ObjectType::View,
2628 if_exists,
2629 definition.name.clone(),
2630 cascade,
2631 )?;
2632
2633 if let Some(id) = maybe_item_to_drop {
2635 let dependencies = view.expr.depends_on();
2636 let invalid_drop = scx
2637 .get_item(&id)
2638 .global_ids()
2639 .any(|gid| dependencies.contains(&gid));
2640 if invalid_drop {
2641 let item = scx.catalog.get_item(&id);
2642 sql_bail!(
2643 "cannot replace view {0}: depended upon by new {0} definition",
2644 scx.catalog.resolve_full_name(item.name())
2645 );
2646 }
2647
2648 Some(id)
2649 } else {
2650 None
2651 }
2652 } else {
2653 None
2654 };
2655 let drop_ids = replace
2656 .map(|id| {
2657 scx.catalog
2658 .item_dependents(id)
2659 .into_iter()
2660 .map(|id| id.unwrap_item_id())
2661 .collect()
2662 })
2663 .unwrap_or_default();
2664
2665 let full_name = scx.catalog.resolve_full_name(&name);
2667 let partial_name = PartialItemName::from(full_name.clone());
2668 if let (Ok(item), IfExistsBehavior::Error, false) = (
2671 scx.catalog.resolve_item_or_type(&partial_name),
2672 *if_exists,
2673 ignore_if_exists_errors,
2674 ) {
2675 return Err(PlanError::ItemAlreadyExists {
2676 name: full_name.to_string(),
2677 item_type: item.item_type(),
2678 });
2679 }
2680
2681 Ok(Plan::CreateView(CreateViewPlan {
2682 name,
2683 view,
2684 replace,
2685 drop_ids,
2686 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2687 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2688 }))
2689}
2690
2691pub fn describe_create_materialized_view(
2692 _: &StatementContext,
2693 _: CreateMaterializedViewStatement<Aug>,
2694) -> Result<StatementDesc, PlanError> {
2695 Ok(StatementDesc::new(None))
2696}
2697
2698pub fn describe_create_continual_task(
2699 _: &StatementContext,
2700 _: CreateContinualTaskStatement<Aug>,
2701) -> Result<StatementDesc, PlanError> {
2702 Ok(StatementDesc::new(None))
2703}
2704
2705pub fn describe_create_network_policy(
2706 _: &StatementContext,
2707 _: CreateNetworkPolicyStatement<Aug>,
2708) -> Result<StatementDesc, PlanError> {
2709 Ok(StatementDesc::new(None))
2710}
2711
2712pub fn describe_alter_network_policy(
2713 _: &StatementContext,
2714 _: AlterNetworkPolicyStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716 Ok(StatementDesc::new(None))
2717}
2718
2719pub fn plan_create_materialized_view(
2720 scx: &StatementContext,
2721 mut stmt: CreateMaterializedViewStatement<Aug>,
2722) -> Result<Plan, PlanError> {
2723 let cluster_id =
2724 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2725 stmt.in_cluster = Some(ResolvedClusterName {
2726 id: cluster_id,
2727 print_name: None,
2728 });
2729
2730 let create_sql =
2731 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2732
2733 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2734 let name = scx.allocate_qualified_name(partial_name.clone())?;
2735
2736 let replacement_target = if let Some(target_name) = &stmt.replacing {
2738 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2739
2740 let target = scx.get_item_by_resolved_name(target_name)?;
2741 if target.item_type() != CatalogItemType::MaterializedView {
2742 return Err(PlanError::InvalidReplacement {
2743 item_type: target.item_type(),
2744 item_name: scx.catalog.minimal_qualification(target.name()),
2745 replacement_type: CatalogItemType::MaterializedView,
2746 replacement_name: partial_name,
2747 });
2748 }
2749 if target.id().is_system() {
2750 sql_bail!(
2751 "cannot replace {} because it is required by the database system",
2752 scx.catalog.minimal_qualification(target.name()),
2753 );
2754 }
2755 Some(target.id())
2756 } else {
2757 None
2758 };
2759
2760 let query::PlannedRootQuery {
2761 expr,
2762 mut desc,
2763 finishing,
2764 scope: _,
2765 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2766 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2768 &finishing,
2769 expr.arity()
2770 ));
2771 if expr.contains_parameters()? {
2772 return Err(PlanError::ParameterNotAllowed(
2773 "materialized views".to_string(),
2774 ));
2775 }
2776
2777 plan_utils::maybe_rename_columns(
2778 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2779 &mut desc,
2780 &stmt.columns,
2781 )?;
2782 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2783
2784 let MaterializedViewOptionExtracted {
2785 assert_not_null,
2786 partition_by,
2787 retain_history,
2788 refresh,
2789 seen: _,
2790 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2791
2792 if let Some(partition_by) = partition_by {
2793 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2794 check_partition_by(&desc, partition_by)?;
2795 }
2796
2797 let refresh_schedule = {
2798 let mut refresh_schedule = RefreshSchedule::default();
2799 let mut on_commits_seen = 0;
2800 for refresh_option_value in refresh {
2801 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2802 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2803 }
2804 match refresh_option_value {
2805 RefreshOptionValue::OnCommit => {
2806 on_commits_seen += 1;
2807 }
2808 RefreshOptionValue::AtCreation => {
2809 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2810 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2811 }
2812 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2813 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2815 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2816 name: "REFRESH AT",
2817 scope: &Scope::empty(),
2818 relation_type: &SqlRelationType::empty(),
2819 allow_aggregates: false,
2820 allow_subqueries: false,
2821 allow_parameters: false,
2822 allow_windows: false,
2823 };
2824 let hir = plan_expr(ecx, &time)?.cast_to(
2825 ecx,
2826 CastContext::Assignment,
2827 &SqlScalarType::MzTimestamp,
2828 )?;
2829 let timestamp = hir
2831 .into_literal_mz_timestamp()
2832 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2833 refresh_schedule.ats.push(timestamp);
2834 }
2835 RefreshOptionValue::Every(RefreshEveryOptionValue {
2836 interval,
2837 aligned_to,
2838 }) => {
2839 let interval = Interval::try_from_value(Value::Interval(interval))?;
2840 if interval.as_microseconds() <= 0 {
2841 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2842 }
2843 if interval.months != 0 {
2844 sql_bail!("REFRESH interval must not involve units larger than days");
2849 }
2850 let interval = interval.duration()?;
2851 if u64::try_from(interval.as_millis()).is_err() {
2852 sql_bail!("REFRESH interval too large");
2853 }
2854
2855 let mut aligned_to = match aligned_to {
2856 Some(aligned_to) => aligned_to,
2857 None => {
2858 soft_panic_or_log!(
2859 "ALIGNED TO should have been filled in by purification"
2860 );
2861 sql_bail!(
2862 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2863 )
2864 }
2865 };
2866
2867 transform_ast::transform(scx, &mut aligned_to)?;
2869
2870 let ecx = &ExprContext {
2871 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2872 name: "REFRESH EVERY ... ALIGNED TO",
2873 scope: &Scope::empty(),
2874 relation_type: &SqlRelationType::empty(),
2875 allow_aggregates: false,
2876 allow_subqueries: false,
2877 allow_parameters: false,
2878 allow_windows: false,
2879 };
2880 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2881 ecx,
2882 CastContext::Assignment,
2883 &SqlScalarType::MzTimestamp,
2884 )?;
2885 let aligned_to_const = aligned_to_hir
2887 .into_literal_mz_timestamp()
2888 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2889
2890 refresh_schedule.everies.push(RefreshEvery {
2891 interval,
2892 aligned_to: aligned_to_const,
2893 });
2894 }
2895 }
2896 }
2897
2898 if on_commits_seen > 1 {
2899 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2900 }
2901 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2902 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2903 }
2904
2905 if refresh_schedule == RefreshSchedule::default() {
2906 None
2907 } else {
2908 Some(refresh_schedule)
2909 }
2910 };
2911
2912 let as_of = stmt.as_of.map(Timestamp::from);
2913 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2914 let mut non_null_assertions = assert_not_null
2915 .into_iter()
2916 .map(normalize::column_name)
2917 .map(|assertion_name| {
2918 column_names
2919 .iter()
2920 .position(|col| col == &assertion_name)
2921 .ok_or_else(|| {
2922 sql_err!(
2923 "column {} in ASSERT NOT NULL option not found",
2924 assertion_name.quoted()
2925 )
2926 })
2927 })
2928 .collect::<Result<Vec<_>, _>>()?;
2929 non_null_assertions.sort();
2930 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2931 let dup = &column_names[*dup];
2932 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2933 }
2934
2935 if let Some(dup) = column_names.iter().duplicates().next() {
2936 sql_bail!("column {} specified more than once", dup.quoted());
2937 }
2938
2939 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2942 Ok(true) => IfExistsBehavior::Skip,
2943 _ => stmt.if_exists,
2944 };
2945
2946 let mut replace = None;
2947 let mut if_not_exists = false;
2948 match if_exists {
2949 IfExistsBehavior::Replace => {
2950 let if_exists = true;
2951 let cascade = false;
2952 let replace_id = plan_drop_item(
2953 scx,
2954 ObjectType::MaterializedView,
2955 if_exists,
2956 partial_name.into(),
2957 cascade,
2958 )?;
2959
2960 if let Some(id) = replace_id {
2962 let dependencies = expr.depends_on();
2963 let invalid_drop = scx
2964 .get_item(&id)
2965 .global_ids()
2966 .any(|gid| dependencies.contains(&gid));
2967 if invalid_drop {
2968 let item = scx.catalog.get_item(&id);
2969 sql_bail!(
2970 "cannot replace materialized view {0}: depended upon by new {0} definition",
2971 scx.catalog.resolve_full_name(item.name())
2972 );
2973 }
2974 replace = Some(id);
2975 }
2976 }
2977 IfExistsBehavior::Skip => if_not_exists = true,
2978 IfExistsBehavior::Error => (),
2979 }
2980 let drop_ids = replace
2981 .map(|id| {
2982 scx.catalog
2983 .item_dependents(id)
2984 .into_iter()
2985 .map(|id| id.unwrap_item_id())
2986 .collect()
2987 })
2988 .unwrap_or_default();
2989 let mut dependencies: BTreeSet<_> = expr
2990 .depends_on()
2991 .into_iter()
2992 .map(|gid| scx.catalog.resolve_item_id(&gid))
2993 .collect();
2994
2995 if let Some(id) = replacement_target {
2996 dependencies.insert(id);
2997 }
2998
2999 let full_name = scx.catalog.resolve_full_name(&name);
3001 let partial_name = PartialItemName::from(full_name.clone());
3002 if let (IfExistsBehavior::Error, Ok(item)) =
3005 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3006 {
3007 return Err(PlanError::ItemAlreadyExists {
3008 name: full_name.to_string(),
3009 item_type: item.item_type(),
3010 });
3011 }
3012
3013 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3014 name,
3015 materialized_view: MaterializedView {
3016 create_sql,
3017 expr,
3018 dependencies: DependencyIds(dependencies),
3019 column_names,
3020 replacement_target,
3021 cluster_id,
3022 non_null_assertions,
3023 compaction_window,
3024 refresh_schedule,
3025 as_of,
3026 },
3027 replace,
3028 drop_ids,
3029 if_not_exists,
3030 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3031 }))
3032}
3033
3034generate_extracted_config!(
3035 MaterializedViewOption,
3036 (AssertNotNull, Ident, AllowMultiple),
3037 (PartitionBy, Vec<Ident>),
3038 (RetainHistory, OptionalDuration),
3039 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3040);
3041
3042pub fn plan_create_continual_task(
3043 scx: &StatementContext,
3044 mut stmt: CreateContinualTaskStatement<Aug>,
3045) -> Result<Plan, PlanError> {
3046 match &stmt.sugar {
3047 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3048 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3049 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3050 }
3051 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3052 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3053 }
3054 };
3055 let cluster_id = match &stmt.in_cluster {
3056 None => scx.catalog.resolve_cluster(None)?.id(),
3057 Some(in_cluster) => in_cluster.id,
3058 };
3059 stmt.in_cluster = Some(ResolvedClusterName {
3060 id: cluster_id,
3061 print_name: None,
3062 });
3063
3064 let create_sql =
3065 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3066
3067 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3068
3069 let mut desc = match stmt.columns {
3074 None => None,
3075 Some(columns) => {
3076 let mut desc_columns = Vec::with_capacity(columns.capacity());
3077 for col in columns.iter() {
3078 desc_columns.push((
3079 normalize::column_name(col.name.clone()),
3080 SqlColumnType {
3081 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3082 nullable: false,
3083 },
3084 ));
3085 }
3086 Some(RelationDesc::from_names_and_types(desc_columns))
3087 }
3088 };
3089 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3090 match input.item_type() {
3091 CatalogItemType::ContinualTask
3094 | CatalogItemType::Table
3095 | CatalogItemType::MaterializedView
3096 | CatalogItemType::Source => {}
3097 CatalogItemType::Sink
3098 | CatalogItemType::View
3099 | CatalogItemType::Index
3100 | CatalogItemType::Type
3101 | CatalogItemType::Func
3102 | CatalogItemType::Secret
3103 | CatalogItemType::Connection => {
3104 sql_bail!(
3105 "CONTINUAL TASK cannot use {} as an input",
3106 input.item_type()
3107 );
3108 }
3109 }
3110
3111 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3112 let ct_name = stmt.name;
3113 let placeholder_id = match &ct_name {
3114 ResolvedItemName::ContinualTask { id, name } => {
3115 let desc = match desc.as_ref().cloned() {
3116 Some(x) => x,
3117 None => {
3118 let input_name = scx.catalog.resolve_full_name(input.name());
3123 input.desc(&input_name)?.into_owned()
3124 }
3125 };
3126 qcx.ctes.insert(
3127 *id,
3128 CteDesc {
3129 name: name.item.clone(),
3130 desc,
3131 },
3132 );
3133 Some(*id)
3134 }
3135 _ => None,
3136 };
3137
3138 let mut exprs = Vec::new();
3139 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3140 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3141 let query::PlannedRootQuery {
3142 expr,
3143 desc: desc_query,
3144 finishing,
3145 scope: _,
3146 } = query::plan_ct_query(&mut qcx, query)?;
3147 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3150 &finishing,
3151 expr.arity()
3152 ));
3153 if expr.contains_parameters()? {
3154 if expr.contains_parameters()? {
3155 return Err(PlanError::ParameterNotAllowed(
3156 "continual tasks".to_string(),
3157 ));
3158 }
3159 }
3160 let expr = match desc.as_mut() {
3161 None => {
3162 desc = Some(desc_query);
3163 expr
3164 }
3165 Some(desc) => {
3166 if desc_query.arity() > desc.arity() {
3169 sql_bail!(
3170 "statement {}: INSERT has more expressions than target columns",
3171 idx
3172 );
3173 }
3174 if desc_query.arity() < desc.arity() {
3175 sql_bail!(
3176 "statement {}: INSERT has more target columns than expressions",
3177 idx
3178 );
3179 }
3180 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3183 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3184 let expr = expr.map_err(|e| {
3185 sql_err!(
3186 "statement {}: column {} is of type {} but expression is of type {}",
3187 idx,
3188 desc.get_name(e.column).quoted(),
3189 qcx.humanize_scalar_type(&e.target_type, false),
3190 qcx.humanize_scalar_type(&e.source_type, false),
3191 )
3192 })?;
3193
3194 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3197 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3198 if updated {
3199 let new_types = zip_types().map(|(ct, q)| {
3200 let mut ct = ct.clone();
3201 if q.nullable {
3202 ct.nullable = true;
3203 }
3204 ct
3205 });
3206 *desc = RelationDesc::from_names_and_types(
3207 desc.iter_names().cloned().zip_eq(new_types),
3208 );
3209 }
3210
3211 expr
3212 }
3213 };
3214 match stmt {
3215 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3216 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3217 }
3218 }
3219 let expr = exprs
3222 .into_iter()
3223 .reduce(|acc, expr| acc.union(expr))
3224 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3225 let dependencies = expr
3226 .depends_on()
3227 .into_iter()
3228 .map(|gid| scx.catalog.resolve_item_id(&gid))
3229 .collect();
3230
3231 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3232 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3233 if let Some(dup) = column_names.iter().duplicates().next() {
3234 sql_bail!("column {} specified more than once", dup.quoted());
3235 }
3236
3237 let name = match &ct_name {
3239 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3240 ResolvedItemName::ContinualTask { name, .. } => {
3241 let name = scx.allocate_qualified_name(name.clone())?;
3242 let full_name = scx.catalog.resolve_full_name(&name);
3243 let partial_name = PartialItemName::from(full_name.clone());
3244 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3247 return Err(PlanError::ItemAlreadyExists {
3248 name: full_name.to_string(),
3249 item_type: item.item_type(),
3250 });
3251 }
3252 name
3253 }
3254 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3255 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3256 };
3257
3258 let as_of = stmt.as_of.map(Timestamp::from);
3259 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3260 name,
3261 placeholder_id,
3262 desc,
3263 input_id: input.global_id(),
3264 with_snapshot: snapshot.unwrap_or(true),
3265 continual_task: MaterializedView {
3266 create_sql,
3267 expr,
3268 dependencies,
3269 column_names,
3270 replacement_target: None,
3271 cluster_id,
3272 non_null_assertions: Vec::new(),
3273 compaction_window: None,
3274 refresh_schedule: None,
3275 as_of,
3276 },
3277 }))
3278}
3279
3280fn continual_task_query<'a>(
3281 ct_name: &ResolvedItemName,
3282 stmt: &'a ast::ContinualTaskStmt<Aug>,
3283) -> Option<ast::Query<Aug>> {
3284 match stmt {
3285 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3286 table_name: _,
3287 columns,
3288 source,
3289 returning,
3290 }) => {
3291 if !columns.is_empty() || !returning.is_empty() {
3292 return None;
3293 }
3294 match source {
3295 ast::InsertSource::Query(query) => Some(query.clone()),
3296 ast::InsertSource::DefaultValues => None,
3297 }
3298 }
3299 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3300 table_name: _,
3301 alias,
3302 using,
3303 selection,
3304 }) => {
3305 if !using.is_empty() {
3306 return None;
3307 }
3308 let from = ast::TableWithJoins {
3310 relation: ast::TableFactor::Table {
3311 name: ct_name.clone(),
3312 alias: alias.clone(),
3313 },
3314 joins: Vec::new(),
3315 };
3316 let select = ast::Select {
3317 from: vec![from],
3318 selection: selection.clone(),
3319 distinct: None,
3320 projection: vec![ast::SelectItem::Wildcard],
3321 group_by: Vec::new(),
3322 having: None,
3323 qualify: None,
3324 options: Vec::new(),
3325 };
3326 let query = ast::Query {
3327 ctes: ast::CteBlock::Simple(Vec::new()),
3328 body: ast::SetExpr::Select(Box::new(select)),
3329 order_by: Vec::new(),
3330 limit: None,
3331 offset: None,
3332 };
3333 Some(query)
3335 }
3336 }
3337}
3338
3339generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3340
3341pub fn describe_create_sink(
3342 _: &StatementContext,
3343 _: CreateSinkStatement<Aug>,
3344) -> Result<StatementDesc, PlanError> {
3345 Ok(StatementDesc::new(None))
3346}
3347
3348generate_extracted_config!(
3349 CreateSinkOption,
3350 (Snapshot, bool),
3351 (PartitionStrategy, String),
3352 (Version, u64),
3353 (CommitInterval, Duration)
3354);
3355
3356pub fn plan_create_sink(
3357 scx: &StatementContext,
3358 stmt: CreateSinkStatement<Aug>,
3359) -> Result<Plan, PlanError> {
3360 let Some(name) = stmt.name.clone() else {
3362 return Err(PlanError::MissingName(CatalogItemType::Sink));
3363 };
3364 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3365 let full_name = scx.catalog.resolve_full_name(&name);
3366 let partial_name = PartialItemName::from(full_name.clone());
3367 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3368 return Err(PlanError::ItemAlreadyExists {
3369 name: full_name.to_string(),
3370 item_type: item.item_type(),
3371 });
3372 }
3373
3374 plan_sink(scx, stmt)
3375}
3376
3377fn plan_sink(
3382 scx: &StatementContext,
3383 mut stmt: CreateSinkStatement<Aug>,
3384) -> Result<Plan, PlanError> {
3385 let CreateSinkStatement {
3386 name,
3387 in_cluster: _,
3388 from,
3389 connection,
3390 format,
3391 envelope,
3392 if_not_exists,
3393 with_options,
3394 } = stmt.clone();
3395
3396 let Some(name) = name else {
3397 return Err(PlanError::MissingName(CatalogItemType::Sink));
3398 };
3399 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3400
3401 let envelope = match envelope {
3402 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3403 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3404 None => sql_bail!("ENVELOPE clause is required"),
3405 };
3406
3407 let from_name = &from;
3408 let from = scx.get_item_by_resolved_name(&from)?;
3409
3410 {
3411 use CatalogItemType::*;
3412 match from.item_type() {
3413 Table | Source | MaterializedView | ContinualTask => {}
3414 Sink | View | Index | Type | Func | Secret | Connection => {
3415 let name = scx.catalog.minimal_qualification(from.name());
3416 return Err(PlanError::InvalidSinkFrom {
3417 name: name.to_string(),
3418 item_type: from.item_type(),
3419 });
3420 }
3421 }
3422 }
3423
3424 if from.id().is_system() {
3425 bail_unsupported!("creating a sink directly on a catalog object");
3426 }
3427 let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3428 let key_indices = match &connection {
3429 CreateSinkConnection::Kafka { key: Some(key), .. }
3430 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3431 let key_columns = key
3432 .key_columns
3433 .clone()
3434 .into_iter()
3435 .map(normalize::column_name)
3436 .collect::<Vec<_>>();
3437 let mut uniq = BTreeSet::new();
3438 for col in key_columns.iter() {
3439 if !uniq.insert(col) {
3440 sql_bail!("duplicate column referenced in KEY: {}", col);
3441 }
3442 }
3443 let indices = key_columns
3444 .iter()
3445 .map(|col| -> anyhow::Result<usize> {
3446 let name_idx =
3447 desc.get_by_name(col)
3448 .map(|(idx, _type)| idx)
3449 .ok_or_else(|| {
3450 sql_err!("column referenced in KEY does not exist: {}", col)
3451 })?;
3452 if desc.get_unambiguous_name(name_idx).is_none() {
3453 sql_err!("column referenced in KEY is ambiguous: {}", col);
3454 }
3455 Ok(name_idx)
3456 })
3457 .collect::<Result<Vec<_>, _>>()?;
3458 let is_valid_key = desc
3459 .typ()
3460 .keys
3461 .iter()
3462 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3463
3464 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3465 if key.not_enforced {
3466 scx.catalog
3467 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3468 key: key_columns.clone(),
3469 name: name.item.clone(),
3470 })
3471 } else {
3472 return Err(PlanError::UpsertSinkWithInvalidKey {
3473 name: from_name.full_name_str(),
3474 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3475 valid_keys: desc
3476 .typ()
3477 .keys
3478 .iter()
3479 .map(|key| {
3480 key.iter()
3481 .map(|col| desc.get_name(*col).as_str().into())
3482 .collect()
3483 })
3484 .collect(),
3485 });
3486 }
3487 }
3488 Some(indices)
3489 }
3490 CreateSinkConnection::Kafka { key: None, .. }
3491 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3492 };
3493
3494 let headers_index = match &connection {
3495 CreateSinkConnection::Kafka {
3496 headers: Some(headers),
3497 ..
3498 } => {
3499 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3500
3501 match envelope {
3502 SinkEnvelope::Upsert => (),
3503 SinkEnvelope::Debezium => {
3504 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3505 }
3506 };
3507
3508 let headers = normalize::column_name(headers.clone());
3509 let (idx, ty) = desc
3510 .get_by_name(&headers)
3511 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3512
3513 if desc.get_unambiguous_name(idx).is_none() {
3514 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3515 }
3516
3517 match &ty.scalar_type {
3518 SqlScalarType::Map { value_type, .. }
3519 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3520 _ => sql_bail!(
3521 "HEADERS column must have type map[text => text] or map[text => bytea]"
3522 ),
3523 }
3524
3525 Some(idx)
3526 }
3527 _ => None,
3528 };
3529
3530 let relation_key_indices = desc.typ().keys.get(0).cloned();
3532
3533 let key_desc_and_indices = key_indices.map(|key_indices| {
3534 let cols = desc
3535 .iter()
3536 .map(|(name, ty)| (name.clone(), ty.clone()))
3537 .collect::<Vec<_>>();
3538 let (names, types): (Vec<_>, Vec<_>) =
3539 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3540 let typ = SqlRelationType::new(types);
3541 (RelationDesc::new(typ, names), key_indices)
3542 });
3543
3544 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3545 return Err(PlanError::UpsertSinkWithoutKey);
3546 }
3547
3548 let connection_builder = match connection {
3549 CreateSinkConnection::Kafka {
3550 connection,
3551 options,
3552 ..
3553 } => kafka_sink_builder(
3554 scx,
3555 connection,
3556 options,
3557 format,
3558 relation_key_indices,
3559 key_desc_and_indices,
3560 headers_index,
3561 desc.into_owned(),
3562 envelope,
3563 from.id(),
3564 )?,
3565 CreateSinkConnection::Iceberg {
3566 connection,
3567 aws_connection,
3568 options,
3569 ..
3570 } => iceberg_sink_builder(
3571 scx,
3572 connection,
3573 aws_connection,
3574 options,
3575 relation_key_indices,
3576 key_desc_and_indices,
3577 )?,
3578 };
3579
3580 let CreateSinkOptionExtracted {
3581 snapshot,
3582 version,
3583 partition_strategy: _,
3584 seen: _,
3585 commit_interval: _,
3586 } = with_options.try_into()?;
3587
3588 let with_snapshot = snapshot.unwrap_or(true);
3590 let version = version.unwrap_or(0);
3592
3593 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3597 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3598
3599 Ok(Plan::CreateSink(CreateSinkPlan {
3600 name,
3601 sink: Sink {
3602 create_sql,
3603 from: from.global_id(),
3604 connection: connection_builder,
3605 envelope,
3606 version,
3607 },
3608 with_snapshot,
3609 if_not_exists,
3610 in_cluster: in_cluster.id(),
3611 }))
3612}
3613
3614fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3615 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3616
3617 let existing_keys = desc
3618 .typ()
3619 .keys
3620 .iter()
3621 .map(|key_columns| {
3622 key_columns
3623 .iter()
3624 .map(|col| desc.get_name(*col).as_str())
3625 .join(", ")
3626 })
3627 .join(", ");
3628
3629 sql_err!(
3630 "Key constraint ({}) conflicts with existing key ({})",
3631 user_keys,
3632 existing_keys
3633 )
3634}
3635
3636#[derive(Debug, Default, PartialEq, Clone)]
3639pub struct CsrConfigOptionExtracted {
3640 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3641 pub(crate) avro_key_fullname: Option<String>,
3642 pub(crate) avro_value_fullname: Option<String>,
3643 pub(crate) null_defaults: bool,
3644 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3645 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3646 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3647 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3648}
3649
3650impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3651 type Error = crate::plan::PlanError;
3652 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3653 let mut extracted = CsrConfigOptionExtracted::default();
3654 let mut common_doc_comments = BTreeMap::new();
3655 for option in v {
3656 if !extracted.seen.insert(option.name.clone()) {
3657 return Err(PlanError::Unstructured({
3658 format!("{} specified more than once", option.name)
3659 }));
3660 }
3661 let option_name = option.name.clone();
3662 let option_name_str = option_name.to_ast_string_simple();
3663 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3664 option_name: option_name.to_ast_string_simple(),
3665 err: e.into(),
3666 };
3667 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3668 val.map(|s| match s {
3669 WithOptionValue::Value(Value::String(s)) => {
3670 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3671 }
3672 _ => Err("must be a string".to_string()),
3673 })
3674 .transpose()
3675 .map_err(PlanError::Unstructured)
3676 .map_err(better_error)
3677 };
3678 match option.name {
3679 CsrConfigOptionName::AvroKeyFullname => {
3680 extracted.avro_key_fullname =
3681 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3682 }
3683 CsrConfigOptionName::AvroValueFullname => {
3684 extracted.avro_value_fullname =
3685 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3686 }
3687 CsrConfigOptionName::NullDefaults => {
3688 extracted.null_defaults =
3689 <bool>::try_from_value(option.value).map_err(better_error)?;
3690 }
3691 CsrConfigOptionName::AvroDocOn(doc_on) => {
3692 let value = String::try_from_value(option.value.ok_or_else(|| {
3693 PlanError::InvalidOptionValue {
3694 option_name: option_name_str,
3695 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3696 }
3697 })?)
3698 .map_err(better_error)?;
3699 let key = match doc_on.identifier {
3700 DocOnIdentifier::Column(ast::ColumnName {
3701 relation: ResolvedItemName::Item { id, .. },
3702 column: ResolvedColumnReference::Column { name, index: _ },
3703 }) => DocTarget::Field {
3704 object_id: id,
3705 column_name: name,
3706 },
3707 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3708 DocTarget::Type(id)
3709 }
3710 _ => unreachable!(),
3711 };
3712
3713 match doc_on.for_schema {
3714 DocOnSchema::KeyOnly => {
3715 extracted.key_doc_options.insert(key, value);
3716 }
3717 DocOnSchema::ValueOnly => {
3718 extracted.value_doc_options.insert(key, value);
3719 }
3720 DocOnSchema::All => {
3721 common_doc_comments.insert(key, value);
3722 }
3723 }
3724 }
3725 CsrConfigOptionName::KeyCompatibilityLevel => {
3726 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3727 }
3728 CsrConfigOptionName::ValueCompatibilityLevel => {
3729 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3730 }
3731 }
3732 }
3733
3734 for (key, value) in common_doc_comments {
3735 if !extracted.key_doc_options.contains_key(&key) {
3736 extracted.key_doc_options.insert(key.clone(), value.clone());
3737 }
3738 if !extracted.value_doc_options.contains_key(&key) {
3739 extracted.value_doc_options.insert(key, value);
3740 }
3741 }
3742 Ok(extracted)
3743 }
3744}
3745
3746fn iceberg_sink_builder(
3747 scx: &StatementContext,
3748 catalog_connection: ResolvedItemName,
3749 aws_connection: ResolvedItemName,
3750 options: Vec<IcebergSinkConfigOption<Aug>>,
3751 relation_key_indices: Option<Vec<usize>>,
3752 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3753) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3754 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3755 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3756 let catalog_connection_id = catalog_connection_item.id();
3757 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3758 let aws_connection_id = aws_connection_item.id();
3759 if !matches!(
3760 catalog_connection_item.connection()?,
3761 Connection::IcebergCatalog(_)
3762 ) {
3763 sql_bail!(
3764 "{} is not an iceberg catalog connection",
3765 scx.catalog
3766 .resolve_full_name(catalog_connection_item.name())
3767 .to_string()
3768 .quoted()
3769 );
3770 };
3771
3772 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3773 sql_bail!(
3774 "{} is not an AWS connection",
3775 scx.catalog
3776 .resolve_full_name(aws_connection_item.name())
3777 .to_string()
3778 .quoted()
3779 );
3780 }
3781
3782 let IcebergSinkConfigOptionExtracted {
3783 table,
3784 namespace,
3785 seen: _,
3786 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3787
3788 let Some(table) = table else {
3789 sql_bail!("Iceberg sink must specify TABLE");
3790 };
3791 let Some(namespace) = namespace else {
3792 sql_bail!("Iceberg sink must specify NAMESPACE");
3793 };
3794
3795 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3796 catalog_connection_id,
3797 catalog_connection: catalog_connection_id,
3798 aws_connection_id,
3799 aws_connection: aws_connection_id,
3800 table,
3801 namespace,
3802 relation_key_indices,
3803 key_desc_and_indices,
3804 }))
3805}
3806
3807fn kafka_sink_builder(
3808 scx: &StatementContext,
3809 connection: ResolvedItemName,
3810 options: Vec<KafkaSinkConfigOption<Aug>>,
3811 format: Option<FormatSpecifier<Aug>>,
3812 relation_key_indices: Option<Vec<usize>>,
3813 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3814 headers_index: Option<usize>,
3815 value_desc: RelationDesc,
3816 envelope: SinkEnvelope,
3817 sink_from: CatalogItemId,
3818) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3819 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3821 let connection_id = connection_item.id();
3822 match connection_item.connection()? {
3823 Connection::Kafka(_) => (),
3824 _ => sql_bail!(
3825 "{} is not a kafka connection",
3826 scx.catalog.resolve_full_name(connection_item.name())
3827 ),
3828 };
3829
3830 let KafkaSinkConfigOptionExtracted {
3831 topic,
3832 compression_type,
3833 partition_by,
3834 progress_group_id_prefix,
3835 transactional_id_prefix,
3836 legacy_ids,
3837 topic_config,
3838 topic_metadata_refresh_interval,
3839 topic_partition_count,
3840 topic_replication_factor,
3841 seen: _,
3842 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3843
3844 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3845 (Some(_), Some(true)) => {
3846 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3847 }
3848 (None, Some(true)) => KafkaIdStyle::Legacy,
3849 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3850 };
3851
3852 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3853 (Some(_), Some(true)) => {
3854 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3855 }
3856 (None, Some(true)) => KafkaIdStyle::Legacy,
3857 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3858 };
3859
3860 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3861
3862 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3863 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3866 }
3867
3868 let assert_positive = |val: Option<i32>, name: &str| {
3869 if let Some(val) = val {
3870 if val <= 0 {
3871 sql_bail!("{} must be a positive integer", name);
3872 }
3873 }
3874 val.map(NonNeg::try_from)
3875 .transpose()
3876 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3877 };
3878 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3879 let topic_replication_factor =
3880 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3881
3882 let gen_avro_schema_options = |conn| {
3885 let CsrConnectionAvro {
3886 connection:
3887 CsrConnection {
3888 connection,
3889 options,
3890 },
3891 seed,
3892 key_strategy,
3893 value_strategy,
3894 } = conn;
3895 if seed.is_some() {
3896 sql_bail!("SEED option does not make sense with sinks");
3897 }
3898 if key_strategy.is_some() {
3899 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3900 }
3901 if value_strategy.is_some() {
3902 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3903 }
3904
3905 let item = scx.get_item_by_resolved_name(&connection)?;
3906 let csr_connection = match item.connection()? {
3907 Connection::Csr(_) => item.id(),
3908 _ => {
3909 sql_bail!(
3910 "{} is not a schema registry connection",
3911 scx.catalog
3912 .resolve_full_name(item.name())
3913 .to_string()
3914 .quoted()
3915 )
3916 }
3917 };
3918 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3919
3920 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3921 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3922 }
3923
3924 if key_desc_and_indices.is_some()
3925 && (extracted_options.avro_key_fullname.is_some()
3926 ^ extracted_options.avro_value_fullname.is_some())
3927 {
3928 sql_bail!(
3929 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3930 );
3931 }
3932
3933 Ok((csr_connection, extracted_options))
3934 };
3935
3936 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3937 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3938 Format::Bytes if desc.arity() == 1 => {
3939 let col_type = &desc.typ().column_types[0].scalar_type;
3940 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3941 bail_unsupported!(format!(
3942 "BYTES format with non-encodable type: {:?}",
3943 col_type
3944 ));
3945 }
3946
3947 Ok(KafkaSinkFormatType::Bytes)
3948 }
3949 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3950 Format::Bytes | Format::Text => {
3951 bail_unsupported!("BYTES or TEXT format with multiple columns")
3952 }
3953 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3954 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3955 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3956 let schema = if is_key {
3957 AvroSchemaGenerator::new(
3958 desc.clone(),
3959 false,
3960 options.key_doc_options,
3961 options.avro_key_fullname.as_deref().unwrap_or("row"),
3962 options.null_defaults,
3963 Some(sink_from),
3964 false,
3965 )?
3966 .schema()
3967 .to_string()
3968 } else {
3969 AvroSchemaGenerator::new(
3970 desc.clone(),
3971 matches!(envelope, SinkEnvelope::Debezium),
3972 options.value_doc_options,
3973 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3974 options.null_defaults,
3975 Some(sink_from),
3976 true,
3977 )?
3978 .schema()
3979 .to_string()
3980 };
3981 Ok(KafkaSinkFormatType::Avro {
3982 schema,
3983 compatibility_level: if is_key {
3984 options.key_compatibility_level
3985 } else {
3986 options.value_compatibility_level
3987 },
3988 csr_connection,
3989 })
3990 }
3991 format => bail_unsupported!(format!("sink format {:?}", format)),
3992 };
3993
3994 let partition_by = match &partition_by {
3995 Some(partition_by) => {
3996 let mut scope = Scope::from_source(None, value_desc.iter_names());
3997
3998 match envelope {
3999 SinkEnvelope::Upsert => (),
4000 SinkEnvelope::Debezium => {
4001 let key_indices: HashSet<_> = key_desc_and_indices
4002 .as_ref()
4003 .map(|(_desc, indices)| indices.as_slice())
4004 .unwrap_or_default()
4005 .into_iter()
4006 .collect();
4007 for (i, item) in scope.items.iter_mut().enumerate() {
4008 if !key_indices.contains(&i) {
4009 item.error_if_referenced = Some(|_table, column| {
4010 PlanError::InvalidPartitionByEnvelopeDebezium {
4011 column_name: column.to_string(),
4012 }
4013 });
4014 }
4015 }
4016 }
4017 };
4018
4019 let ecx = &ExprContext {
4020 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4021 name: "PARTITION BY",
4022 scope: &scope,
4023 relation_type: value_desc.typ(),
4024 allow_aggregates: false,
4025 allow_subqueries: false,
4026 allow_parameters: false,
4027 allow_windows: false,
4028 };
4029 let expr = plan_expr(ecx, partition_by)?.cast_to(
4030 ecx,
4031 CastContext::Assignment,
4032 &SqlScalarType::UInt64,
4033 )?;
4034 let expr = expr.lower_uncorrelated()?;
4035
4036 Some(expr)
4037 }
4038 _ => None,
4039 };
4040
4041 let format = match format {
4043 Some(FormatSpecifier::KeyValue { key, value }) => {
4044 let key_format = match key_desc_and_indices.as_ref() {
4045 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4046 None => None,
4047 };
4048 KafkaSinkFormat {
4049 value_format: map_format(value, &value_desc, false)?,
4050 key_format,
4051 }
4052 }
4053 Some(FormatSpecifier::Bare(format)) => {
4054 let key_format = match key_desc_and_indices.as_ref() {
4055 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4056 None => None,
4057 };
4058 KafkaSinkFormat {
4059 value_format: map_format(format, &value_desc, false)?,
4060 key_format,
4061 }
4062 }
4063 None => bail_unsupported!("sink without format"),
4064 };
4065
4066 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4067 connection_id,
4068 connection: connection_id,
4069 format,
4070 topic: topic_name,
4071 relation_key_indices,
4072 key_desc_and_indices,
4073 headers_index,
4074 value_desc,
4075 partition_by,
4076 compression_type,
4077 progress_group_id,
4078 transactional_id,
4079 topic_options: KafkaTopicOptions {
4080 partition_count: topic_partition_count,
4081 replication_factor: topic_replication_factor,
4082 topic_config: topic_config.unwrap_or_default(),
4083 },
4084 topic_metadata_refresh_interval,
4085 }))
4086}
4087
4088pub fn describe_create_index(
4089 _: &StatementContext,
4090 _: CreateIndexStatement<Aug>,
4091) -> Result<StatementDesc, PlanError> {
4092 Ok(StatementDesc::new(None))
4093}
4094
4095pub fn plan_create_index(
4096 scx: &StatementContext,
4097 mut stmt: CreateIndexStatement<Aug>,
4098) -> Result<Plan, PlanError> {
4099 let CreateIndexStatement {
4100 name,
4101 on_name,
4102 in_cluster,
4103 key_parts,
4104 with_options,
4105 if_not_exists,
4106 } = &mut stmt;
4107 let on = scx.get_item_by_resolved_name(on_name)?;
4108 let on_item_type = on.item_type();
4109
4110 if !matches!(
4111 on_item_type,
4112 CatalogItemType::View
4113 | CatalogItemType::MaterializedView
4114 | CatalogItemType::Source
4115 | CatalogItemType::Table
4116 ) {
4117 sql_bail!(
4118 "index cannot be created on {} because it is a {}",
4119 on_name.full_name_str(),
4120 on.item_type()
4121 )
4122 }
4123
4124 let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
4125
4126 let filled_key_parts = match key_parts {
4127 Some(kp) => kp.to_vec(),
4128 None => {
4129 on.desc(&scx.catalog.resolve_full_name(on.name()))?
4131 .typ()
4132 .default_key()
4133 .iter()
4134 .map(|i| match on_desc.get_unambiguous_name(*i) {
4135 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4136 _ => Expr::Value(Value::Number((i + 1).to_string())),
4137 })
4138 .collect()
4139 }
4140 };
4141 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4142
4143 let index_name = if let Some(name) = name {
4144 QualifiedItemName {
4145 qualifiers: on.name().qualifiers.clone(),
4146 item: normalize::ident(name.clone()),
4147 }
4148 } else {
4149 let mut idx_name = QualifiedItemName {
4150 qualifiers: on.name().qualifiers.clone(),
4151 item: on.name().item.clone(),
4152 };
4153 if key_parts.is_none() {
4154 idx_name.item += "_primary_idx";
4156 } else {
4157 let index_name_col_suffix = keys
4160 .iter()
4161 .map(|k| match k {
4162 mz_expr::MirScalarExpr::Column(i, name) => {
4163 match (on_desc.get_unambiguous_name(*i), &name.0) {
4164 (Some(col_name), _) => col_name.to_string(),
4165 (None, Some(name)) => name.to_string(),
4166 (None, None) => format!("{}", i + 1),
4167 }
4168 }
4169 _ => "expr".to_string(),
4170 })
4171 .join("_");
4172 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4173 .expect("write on strings cannot fail");
4174 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4175 }
4176
4177 if !*if_not_exists {
4178 scx.catalog.find_available_name(idx_name)
4179 } else {
4180 idx_name
4181 }
4182 };
4183
4184 let full_name = scx.catalog.resolve_full_name(&index_name);
4186 let partial_name = PartialItemName::from(full_name.clone());
4187 if let (Ok(item), false, false) = (
4195 scx.catalog.resolve_item_or_type(&partial_name),
4196 *if_not_exists,
4197 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4198 ) {
4199 return Err(PlanError::ItemAlreadyExists {
4200 name: full_name.to_string(),
4201 item_type: item.item_type(),
4202 });
4203 }
4204
4205 let options = plan_index_options(scx, with_options.clone())?;
4206 let cluster_id = match in_cluster {
4207 None => scx.resolve_cluster(None)?.id(),
4208 Some(in_cluster) => in_cluster.id,
4209 };
4210
4211 *in_cluster = Some(ResolvedClusterName {
4212 id: cluster_id,
4213 print_name: None,
4214 });
4215
4216 *name = Some(Ident::new(index_name.item.clone())?);
4218 *key_parts = Some(filled_key_parts);
4219 let if_not_exists = *if_not_exists;
4220
4221 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4222 let compaction_window = options.iter().find_map(|o| {
4223 #[allow(irrefutable_let_patterns)]
4224 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4225 Some(lcw.clone())
4226 } else {
4227 None
4228 }
4229 });
4230
4231 Ok(Plan::CreateIndex(CreateIndexPlan {
4232 name: index_name,
4233 index: Index {
4234 create_sql,
4235 on: on.global_id(),
4236 keys,
4237 cluster_id,
4238 compaction_window,
4239 },
4240 if_not_exists,
4241 }))
4242}
4243
4244pub fn describe_create_type(
4245 _: &StatementContext,
4246 _: CreateTypeStatement<Aug>,
4247) -> Result<StatementDesc, PlanError> {
4248 Ok(StatementDesc::new(None))
4249}
4250
4251pub fn plan_create_type(
4252 scx: &StatementContext,
4253 stmt: CreateTypeStatement<Aug>,
4254) -> Result<Plan, PlanError> {
4255 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4256 let CreateTypeStatement { name, as_type, .. } = stmt;
4257
4258 fn validate_data_type(
4259 scx: &StatementContext,
4260 data_type: ResolvedDataType,
4261 as_type: &str,
4262 key: &str,
4263 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4264 let (id, modifiers) = match data_type {
4265 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4266 _ => sql_bail!(
4267 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4268 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4269 as_type,
4270 key,
4271 data_type.human_readable_name(),
4272 ),
4273 };
4274
4275 let item = scx.catalog.get_item(&id);
4276 match item.type_details() {
4277 None => sql_bail!(
4278 "{} must be of class type, but received {} which is of class {}",
4279 key,
4280 scx.catalog.resolve_full_name(item.name()),
4281 item.item_type()
4282 ),
4283 Some(CatalogTypeDetails {
4284 typ: CatalogType::Char,
4285 ..
4286 }) => {
4287 bail_unsupported!("embedding char type in a list or map")
4288 }
4289 _ => {
4290 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4292
4293 Ok((id, modifiers))
4294 }
4295 }
4296 }
4297
4298 let inner = match as_type {
4299 CreateTypeAs::List { options } => {
4300 let CreateTypeListOptionExtracted {
4301 element_type,
4302 seen: _,
4303 } = CreateTypeListOptionExtracted::try_from(options)?;
4304 let element_type =
4305 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4306 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4307 CatalogType::List {
4308 element_reference: id,
4309 element_modifiers: modifiers,
4310 }
4311 }
4312 CreateTypeAs::Map { options } => {
4313 let CreateTypeMapOptionExtracted {
4314 key_type,
4315 value_type,
4316 seen: _,
4317 } = CreateTypeMapOptionExtracted::try_from(options)?;
4318 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4319 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4320 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4321 let (value_id, value_modifiers) =
4322 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4323 CatalogType::Map {
4324 key_reference: key_id,
4325 key_modifiers,
4326 value_reference: value_id,
4327 value_modifiers,
4328 }
4329 }
4330 CreateTypeAs::Record { column_defs } => {
4331 let mut fields = vec![];
4332 for column_def in column_defs {
4333 let data_type = column_def.data_type;
4334 let key = ident(column_def.name.clone());
4335 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4336 fields.push(CatalogRecordField {
4337 name: ColumnName::from(key.clone()),
4338 type_reference: id,
4339 type_modifiers: modifiers,
4340 });
4341 }
4342 CatalogType::Record { fields }
4343 }
4344 };
4345
4346 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4347
4348 let full_name = scx.catalog.resolve_full_name(&name);
4350 let partial_name = PartialItemName::from(full_name.clone());
4351 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4354 if item.item_type().conflicts_with_type() {
4355 return Err(PlanError::ItemAlreadyExists {
4356 name: full_name.to_string(),
4357 item_type: item.item_type(),
4358 });
4359 }
4360 }
4361
4362 Ok(Plan::CreateType(CreateTypePlan {
4363 name,
4364 typ: Type { create_sql, inner },
4365 }))
4366}
4367
4368generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4369
4370generate_extracted_config!(
4371 CreateTypeMapOption,
4372 (KeyType, ResolvedDataType),
4373 (ValueType, ResolvedDataType)
4374);
4375
4376#[derive(Debug)]
4377pub enum PlannedAlterRoleOption {
4378 Attributes(PlannedRoleAttributes),
4379 Variable(PlannedRoleVariable),
4380}
4381
4382#[derive(Debug, Clone)]
4383pub struct PlannedRoleAttributes {
4384 pub inherit: Option<bool>,
4385 pub password: Option<Password>,
4386 pub scram_iterations: Option<NonZeroU32>,
4387 pub nopassword: Option<bool>,
4391 pub superuser: Option<bool>,
4392 pub login: Option<bool>,
4393}
4394
4395fn plan_role_attributes(
4396 options: Vec<RoleAttribute>,
4397 scx: &StatementContext,
4398) -> Result<PlannedRoleAttributes, PlanError> {
4399 let mut planned_attributes = PlannedRoleAttributes {
4400 inherit: None,
4401 password: None,
4402 scram_iterations: None,
4403 superuser: None,
4404 login: None,
4405 nopassword: None,
4406 };
4407
4408 for option in options {
4409 match option {
4410 RoleAttribute::Inherit | RoleAttribute::NoInherit
4411 if planned_attributes.inherit.is_some() =>
4412 {
4413 sql_bail!("conflicting or redundant options");
4414 }
4415 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4416 bail_never_supported!(
4417 "CREATECLUSTER attribute",
4418 "sql/create-role/#details",
4419 "Use system privileges instead."
4420 );
4421 }
4422 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4423 bail_never_supported!(
4424 "CREATEDB attribute",
4425 "sql/create-role/#details",
4426 "Use system privileges instead."
4427 );
4428 }
4429 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4430 bail_never_supported!(
4431 "CREATEROLE attribute",
4432 "sql/create-role/#details",
4433 "Use system privileges instead."
4434 );
4435 }
4436 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4437 sql_bail!("conflicting or redundant options");
4438 }
4439
4440 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4441 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4442 RoleAttribute::Password(password) => {
4443 if let Some(password) = password {
4444 planned_attributes.password = Some(password.into());
4445 planned_attributes.scram_iterations =
4446 Some(scx.catalog.system_vars().scram_iterations())
4447 } else {
4448 planned_attributes.nopassword = Some(true);
4449 }
4450 }
4451 RoleAttribute::SuperUser => {
4452 if planned_attributes.superuser == Some(false) {
4453 sql_bail!("conflicting or redundant options");
4454 }
4455 planned_attributes.superuser = Some(true);
4456 }
4457 RoleAttribute::NoSuperUser => {
4458 if planned_attributes.superuser == Some(true) {
4459 sql_bail!("conflicting or redundant options");
4460 }
4461 planned_attributes.superuser = Some(false);
4462 }
4463 RoleAttribute::Login => {
4464 if planned_attributes.login == Some(false) {
4465 sql_bail!("conflicting or redundant options");
4466 }
4467 planned_attributes.login = Some(true);
4468 }
4469 RoleAttribute::NoLogin => {
4470 if planned_attributes.login == Some(true) {
4471 sql_bail!("conflicting or redundant options");
4472 }
4473 planned_attributes.login = Some(false);
4474 }
4475 }
4476 }
4477 if planned_attributes.inherit == Some(false) {
4478 bail_unsupported!("non inherit roles");
4479 }
4480
4481 Ok(planned_attributes)
4482}
4483
4484#[derive(Debug)]
4485pub enum PlannedRoleVariable {
4486 Set { name: String, value: VariableValue },
4487 Reset { name: String },
4488}
4489
4490impl PlannedRoleVariable {
4491 pub fn name(&self) -> &str {
4492 match self {
4493 PlannedRoleVariable::Set { name, .. } => name,
4494 PlannedRoleVariable::Reset { name } => name,
4495 }
4496 }
4497}
4498
4499fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4500 let plan = match variable {
4501 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4502 name: name.to_string(),
4503 value: scl::plan_set_variable_to(value)?,
4504 },
4505 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4506 name: name.to_string(),
4507 },
4508 };
4509 Ok(plan)
4510}
4511
4512pub fn describe_create_role(
4513 _: &StatementContext,
4514 _: CreateRoleStatement,
4515) -> Result<StatementDesc, PlanError> {
4516 Ok(StatementDesc::new(None))
4517}
4518
4519pub fn plan_create_role(
4520 scx: &StatementContext,
4521 CreateRoleStatement { name, options }: CreateRoleStatement,
4522) -> Result<Plan, PlanError> {
4523 let attributes = plan_role_attributes(options, scx)?;
4524 Ok(Plan::CreateRole(CreateRolePlan {
4525 name: normalize::ident(name),
4526 attributes: attributes.into(),
4527 }))
4528}
4529
4530pub fn plan_create_network_policy(
4531 ctx: &StatementContext,
4532 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4533) -> Result<Plan, PlanError> {
4534 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4535 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4536
4537 let Some(rule_defs) = policy_options.rules else {
4538 sql_bail!("RULES must be specified when creating network policies.");
4539 };
4540
4541 let mut rules = vec![];
4542 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4543 let NetworkPolicyRuleOptionExtracted {
4544 seen: _,
4545 direction,
4546 action,
4547 address,
4548 } = options.try_into()?;
4549 let (direction, action, address) = match (direction, action, address) {
4550 (Some(direction), Some(action), Some(address)) => (
4551 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4552 NetworkPolicyRuleAction::try_from(action.as_str())?,
4553 PolicyAddress::try_from(address.as_str())?,
4554 ),
4555 (_, _, _) => {
4556 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4557 }
4558 };
4559 rules.push(NetworkPolicyRule {
4560 name: normalize::ident(name),
4561 direction,
4562 action,
4563 address,
4564 });
4565 }
4566
4567 if rules.len()
4568 > ctx
4569 .catalog
4570 .system_vars()
4571 .max_rules_per_network_policy()
4572 .try_into()?
4573 {
4574 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4575 }
4576
4577 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4578 name: normalize::ident(name),
4579 rules,
4580 }))
4581}
4582
4583pub fn plan_alter_network_policy(
4584 ctx: &StatementContext,
4585 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4586) -> Result<Plan, PlanError> {
4587 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4588
4589 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4590 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4591
4592 let Some(rule_defs) = policy_options.rules else {
4593 sql_bail!("RULES must be specified when creating network policies.");
4594 };
4595
4596 let mut rules = vec![];
4597 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4598 let NetworkPolicyRuleOptionExtracted {
4599 seen: _,
4600 direction,
4601 action,
4602 address,
4603 } = options.try_into()?;
4604
4605 let (direction, action, address) = match (direction, action, address) {
4606 (Some(direction), Some(action), Some(address)) => (
4607 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4608 NetworkPolicyRuleAction::try_from(action.as_str())?,
4609 PolicyAddress::try_from(address.as_str())?,
4610 ),
4611 (_, _, _) => {
4612 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4613 }
4614 };
4615 rules.push(NetworkPolicyRule {
4616 name: normalize::ident(name),
4617 direction,
4618 action,
4619 address,
4620 });
4621 }
4622 if rules.len()
4623 > ctx
4624 .catalog
4625 .system_vars()
4626 .max_rules_per_network_policy()
4627 .try_into()?
4628 {
4629 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4630 }
4631
4632 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4633 id: policy.id(),
4634 name: normalize::ident(name),
4635 rules,
4636 }))
4637}
4638
4639pub fn describe_create_cluster(
4640 _: &StatementContext,
4641 _: CreateClusterStatement<Aug>,
4642) -> Result<StatementDesc, PlanError> {
4643 Ok(StatementDesc::new(None))
4644}
4645
4646generate_extracted_config!(
4652 ClusterOption,
4653 (AvailabilityZones, Vec<String>),
4654 (Disk, bool),
4655 (IntrospectionDebugging, bool),
4656 (IntrospectionInterval, OptionalDuration),
4657 (Managed, bool),
4658 (Replicas, Vec<ReplicaDefinition<Aug>>),
4659 (ReplicationFactor, u32),
4660 (Size, String),
4661 (Schedule, ClusterScheduleOptionValue),
4662 (WorkloadClass, OptionalString)
4663);
4664
4665generate_extracted_config!(
4666 NetworkPolicyOption,
4667 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4668);
4669
4670generate_extracted_config!(
4671 NetworkPolicyRuleOption,
4672 (Direction, String),
4673 (Action, String),
4674 (Address, String)
4675);
4676
4677generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4678
4679generate_extracted_config!(
4680 ClusterAlterUntilReadyOption,
4681 (Timeout, Duration),
4682 (OnTimeout, String)
4683);
4684
4685generate_extracted_config!(
4686 ClusterFeature,
4687 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4688 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4689 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4690 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4691 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4692 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4693 (
4694 EnableProjectionPushdownAfterRelationCse,
4695 Option<bool>,
4696 Default(None)
4697 )
4698);
4699
4700pub fn plan_create_cluster(
4704 scx: &StatementContext,
4705 stmt: CreateClusterStatement<Aug>,
4706) -> Result<Plan, PlanError> {
4707 let plan = plan_create_cluster_inner(scx, stmt)?;
4708
4709 if let CreateClusterVariant::Managed(_) = &plan.variant {
4711 let stmt = unplan_create_cluster(scx, plan.clone())
4712 .map_err(|e| PlanError::Replan(e.to_string()))?;
4713 let create_sql = stmt.to_ast_string_stable();
4714 let stmt = parse::parse(&create_sql)
4715 .map_err(|e| PlanError::Replan(e.to_string()))?
4716 .into_element()
4717 .ast;
4718 let (stmt, _resolved_ids) =
4719 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4720 let stmt = match stmt {
4721 Statement::CreateCluster(stmt) => stmt,
4722 stmt => {
4723 return Err(PlanError::Replan(format!(
4724 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4725 )));
4726 }
4727 };
4728 let replan =
4729 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4730 if plan != replan {
4731 return Err(PlanError::Replan(format!(
4732 "replan does not match: plan={plan:?}, replan={replan:?}"
4733 )));
4734 }
4735 }
4736
4737 Ok(Plan::CreateCluster(plan))
4738}
4739
4740pub fn plan_create_cluster_inner(
4741 scx: &StatementContext,
4742 CreateClusterStatement {
4743 name,
4744 options,
4745 features,
4746 }: CreateClusterStatement<Aug>,
4747) -> Result<CreateClusterPlan, PlanError> {
4748 let ClusterOptionExtracted {
4749 availability_zones,
4750 introspection_debugging,
4751 introspection_interval,
4752 managed,
4753 replicas,
4754 replication_factor,
4755 seen: _,
4756 size,
4757 disk,
4758 schedule,
4759 workload_class,
4760 }: ClusterOptionExtracted = options.try_into()?;
4761
4762 let managed = managed.unwrap_or_else(|| replicas.is_none());
4763
4764 if !scx.catalog.active_role_id().is_system() {
4765 if !features.is_empty() {
4766 sql_bail!("FEATURES not supported for non-system users");
4767 }
4768 if workload_class.is_some() {
4769 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4770 }
4771 }
4772
4773 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4774 let workload_class = workload_class.and_then(|v| v.0);
4775
4776 if managed {
4777 if replicas.is_some() {
4778 sql_bail!("REPLICAS not supported for managed clusters");
4779 }
4780 let Some(size) = size else {
4781 sql_bail!("SIZE must be specified for managed clusters");
4782 };
4783
4784 if disk.is_some() {
4785 if scx.catalog.is_cluster_size_cc(&size) {
4789 sql_bail!(
4790 "DISK option not supported for modern cluster sizes because disk is always enabled"
4791 );
4792 }
4793
4794 scx.catalog
4795 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4796 }
4797
4798 let compute = plan_compute_replica_config(
4799 introspection_interval,
4800 introspection_debugging.unwrap_or(false),
4801 )?;
4802
4803 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4804 replication_factor.unwrap_or_else(|| {
4805 scx.catalog
4806 .system_vars()
4807 .default_cluster_replication_factor()
4808 })
4809 } else {
4810 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4811 if replication_factor.is_some() {
4812 sql_bail!(
4813 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4814 );
4815 }
4816 0
4820 };
4821 let availability_zones = availability_zones.unwrap_or_default();
4822
4823 if !availability_zones.is_empty() {
4824 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4825 }
4826
4827 let ClusterFeatureExtracted {
4829 reoptimize_imported_views,
4830 enable_eager_delta_joins,
4831 enable_new_outer_join_lowering,
4832 enable_variadic_left_join_lowering,
4833 enable_letrec_fixpoint_analysis,
4834 enable_join_prioritize_arranged,
4835 enable_projection_pushdown_after_relation_cse,
4836 seen: _,
4837 } = ClusterFeatureExtracted::try_from(features)?;
4838 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4839 reoptimize_imported_views,
4840 enable_eager_delta_joins,
4841 enable_new_outer_join_lowering,
4842 enable_variadic_left_join_lowering,
4843 enable_letrec_fixpoint_analysis,
4844 enable_join_prioritize_arranged,
4845 enable_projection_pushdown_after_relation_cse,
4846 ..Default::default()
4847 };
4848
4849 let schedule = plan_cluster_schedule(schedule)?;
4850
4851 Ok(CreateClusterPlan {
4852 name: normalize::ident(name),
4853 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4854 replication_factor,
4855 size,
4856 availability_zones,
4857 compute,
4858 optimizer_feature_overrides,
4859 schedule,
4860 }),
4861 workload_class,
4862 })
4863 } else {
4864 let Some(replica_defs) = replicas else {
4865 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4866 };
4867 if availability_zones.is_some() {
4868 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4869 }
4870 if replication_factor.is_some() {
4871 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4872 }
4873 if introspection_debugging.is_some() {
4874 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4875 }
4876 if introspection_interval.is_some() {
4877 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4878 }
4879 if size.is_some() {
4880 sql_bail!("SIZE not supported for unmanaged clusters");
4881 }
4882 if disk.is_some() {
4883 sql_bail!("DISK not supported for unmanaged clusters");
4884 }
4885 if !features.is_empty() {
4886 sql_bail!("FEATURES not supported for unmanaged clusters");
4887 }
4888 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4889 sql_bail!(
4890 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4891 );
4892 }
4893
4894 let mut replicas = vec![];
4895 for ReplicaDefinition { name, options } in replica_defs {
4896 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4897 }
4898
4899 Ok(CreateClusterPlan {
4900 name: normalize::ident(name),
4901 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4902 workload_class,
4903 })
4904 }
4905}
4906
4907pub fn unplan_create_cluster(
4911 scx: &StatementContext,
4912 CreateClusterPlan {
4913 name,
4914 variant,
4915 workload_class,
4916 }: CreateClusterPlan,
4917) -> Result<CreateClusterStatement<Aug>, PlanError> {
4918 match variant {
4919 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4920 replication_factor,
4921 size,
4922 availability_zones,
4923 compute,
4924 optimizer_feature_overrides,
4925 schedule,
4926 }) => {
4927 let schedule = unplan_cluster_schedule(schedule);
4928 let OptimizerFeatureOverrides {
4929 enable_guard_subquery_tablefunc: _,
4930 enable_consolidate_after_union_negate: _,
4931 enable_reduce_mfp_fusion: _,
4932 enable_cardinality_estimates: _,
4933 persist_fast_path_limit: _,
4934 reoptimize_imported_views,
4935 enable_eager_delta_joins,
4936 enable_new_outer_join_lowering,
4937 enable_variadic_left_join_lowering,
4938 enable_letrec_fixpoint_analysis,
4939 enable_reduce_reduction: _,
4940 enable_join_prioritize_arranged,
4941 enable_projection_pushdown_after_relation_cse,
4942 enable_less_reduce_in_eqprop: _,
4943 enable_dequadratic_eqprop_map: _,
4944 enable_eq_classes_withholding_errors: _,
4945 enable_fast_path_plan_insights: _,
4946 } = optimizer_feature_overrides;
4947 let features_extracted = ClusterFeatureExtracted {
4949 seen: Default::default(),
4951 reoptimize_imported_views,
4952 enable_eager_delta_joins,
4953 enable_new_outer_join_lowering,
4954 enable_variadic_left_join_lowering,
4955 enable_letrec_fixpoint_analysis,
4956 enable_join_prioritize_arranged,
4957 enable_projection_pushdown_after_relation_cse,
4958 };
4959 let features = features_extracted.into_values(scx.catalog);
4960 let availability_zones = if availability_zones.is_empty() {
4961 None
4962 } else {
4963 Some(availability_zones)
4964 };
4965 let (introspection_interval, introspection_debugging) =
4966 unplan_compute_replica_config(compute);
4967 let replication_factor = match &schedule {
4970 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4971 ClusterScheduleOptionValue::Refresh { .. } => {
4972 assert!(
4973 replication_factor <= 1,
4974 "replication factor, {replication_factor:?}, must be <= 1"
4975 );
4976 None
4977 }
4978 };
4979 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4980 let options_extracted = ClusterOptionExtracted {
4981 seen: Default::default(),
4983 availability_zones,
4984 disk: None,
4985 introspection_debugging: Some(introspection_debugging),
4986 introspection_interval,
4987 managed: Some(true),
4988 replicas: None,
4989 replication_factor,
4990 size: Some(size),
4991 schedule: Some(schedule),
4992 workload_class,
4993 };
4994 let options = options_extracted.into_values(scx.catalog);
4995 let name = Ident::new_unchecked(name);
4996 Ok(CreateClusterStatement {
4997 name,
4998 options,
4999 features,
5000 })
5001 }
5002 CreateClusterVariant::Unmanaged(_) => {
5003 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5004 }
5005 }
5006}
5007
5008generate_extracted_config!(
5009 ReplicaOption,
5010 (AvailabilityZone, String),
5011 (BilledAs, String),
5012 (ComputeAddresses, Vec<String>),
5013 (ComputectlAddresses, Vec<String>),
5014 (Disk, bool),
5015 (Internal, bool, Default(false)),
5016 (IntrospectionDebugging, bool, Default(false)),
5017 (IntrospectionInterval, OptionalDuration),
5018 (Size, String),
5019 (StorageAddresses, Vec<String>),
5020 (StoragectlAddresses, Vec<String>),
5021 (Workers, u16)
5022);
5023
5024fn plan_replica_config(
5025 scx: &StatementContext,
5026 options: Vec<ReplicaOption<Aug>>,
5027) -> Result<ReplicaConfig, PlanError> {
5028 let ReplicaOptionExtracted {
5029 availability_zone,
5030 billed_as,
5031 computectl_addresses,
5032 disk,
5033 internal,
5034 introspection_debugging,
5035 introspection_interval,
5036 size,
5037 storagectl_addresses,
5038 ..
5039 }: ReplicaOptionExtracted = options.try_into()?;
5040
5041 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5042
5043 match (
5044 size,
5045 availability_zone,
5046 billed_as,
5047 storagectl_addresses,
5048 computectl_addresses,
5049 ) {
5050 (None, _, None, None, None) => {
5052 sql_bail!("SIZE option must be specified");
5055 }
5056 (Some(size), availability_zone, billed_as, None, None) => {
5057 if disk.is_some() {
5058 if scx.catalog.is_cluster_size_cc(&size) {
5062 sql_bail!(
5063 "DISK option not supported for modern cluster sizes because disk is always enabled"
5064 );
5065 }
5066
5067 scx.catalog
5068 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5069 }
5070
5071 Ok(ReplicaConfig::Orchestrated {
5072 size,
5073 availability_zone,
5074 compute,
5075 billed_as,
5076 internal,
5077 })
5078 }
5079
5080 (None, None, None, storagectl_addresses, computectl_addresses) => {
5081 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5082
5083 let Some(storagectl_addrs) = storagectl_addresses else {
5087 sql_bail!("missing STORAGECTL ADDRESSES option");
5088 };
5089 let Some(computectl_addrs) = computectl_addresses else {
5090 sql_bail!("missing COMPUTECTL ADDRESSES option");
5091 };
5092
5093 if storagectl_addrs.len() != computectl_addrs.len() {
5094 sql_bail!(
5095 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5096 );
5097 }
5098
5099 if disk.is_some() {
5100 sql_bail!("DISK can't be specified for unorchestrated clusters");
5101 }
5102
5103 Ok(ReplicaConfig::Unorchestrated {
5104 storagectl_addrs,
5105 computectl_addrs,
5106 compute,
5107 })
5108 }
5109 _ => {
5110 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5113 }
5114 }
5115}
5116
5117fn plan_compute_replica_config(
5121 introspection_interval: Option<OptionalDuration>,
5122 introspection_debugging: bool,
5123) -> Result<ComputeReplicaConfig, PlanError> {
5124 let introspection_interval = introspection_interval
5125 .map(|OptionalDuration(i)| i)
5126 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5127 let introspection = match introspection_interval {
5128 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5129 interval,
5130 debugging: introspection_debugging,
5131 }),
5132 None if introspection_debugging => {
5133 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5134 }
5135 None => None,
5136 };
5137 let compute = ComputeReplicaConfig { introspection };
5138 Ok(compute)
5139}
5140
5141fn unplan_compute_replica_config(
5145 compute_replica_config: ComputeReplicaConfig,
5146) -> (Option<OptionalDuration>, bool) {
5147 match compute_replica_config.introspection {
5148 Some(ComputeReplicaIntrospectionConfig {
5149 debugging,
5150 interval,
5151 }) => (Some(OptionalDuration(Some(interval))), debugging),
5152 None => (Some(OptionalDuration(None)), false),
5153 }
5154}
5155
5156fn plan_cluster_schedule(
5160 schedule: ClusterScheduleOptionValue,
5161) -> Result<ClusterSchedule, PlanError> {
5162 Ok(match schedule {
5163 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5164 ClusterScheduleOptionValue::Refresh {
5166 hydration_time_estimate: None,
5167 } => ClusterSchedule::Refresh {
5168 hydration_time_estimate: Duration::from_millis(0),
5169 },
5170 ClusterScheduleOptionValue::Refresh {
5172 hydration_time_estimate: Some(interval_value),
5173 } => {
5174 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5175 if interval.as_microseconds() < 0 {
5176 sql_bail!(
5177 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5178 interval
5179 );
5180 }
5181 if interval.months != 0 {
5182 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5186 }
5187 let duration = interval.duration()?;
5188 if u64::try_from(duration.as_millis()).is_err()
5189 || Interval::from_duration(&duration).is_err()
5190 {
5191 sql_bail!("HYDRATION TIME ESTIMATE too large");
5192 }
5193 ClusterSchedule::Refresh {
5194 hydration_time_estimate: duration,
5195 }
5196 }
5197 })
5198}
5199
5200fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5204 match schedule {
5205 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5206 ClusterSchedule::Refresh {
5207 hydration_time_estimate,
5208 } => {
5209 let interval = Interval::from_duration(&hydration_time_estimate)
5210 .expect("planning ensured that this is convertible back to Interval");
5211 let interval_value = literal::unplan_interval(&interval);
5212 ClusterScheduleOptionValue::Refresh {
5213 hydration_time_estimate: Some(interval_value),
5214 }
5215 }
5216 }
5217}
5218
5219pub fn describe_create_cluster_replica(
5220 _: &StatementContext,
5221 _: CreateClusterReplicaStatement<Aug>,
5222) -> Result<StatementDesc, PlanError> {
5223 Ok(StatementDesc::new(None))
5224}
5225
5226pub fn plan_create_cluster_replica(
5227 scx: &StatementContext,
5228 CreateClusterReplicaStatement {
5229 definition: ReplicaDefinition { name, options },
5230 of_cluster,
5231 }: CreateClusterReplicaStatement<Aug>,
5232) -> Result<Plan, PlanError> {
5233 let cluster = scx
5234 .catalog
5235 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5236 let current_replica_count = cluster.replica_ids().iter().count();
5237 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5238 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5239 return Err(PlanError::CreateReplicaFailStorageObjects {
5240 current_replica_count,
5241 internal_replica_count,
5242 hypothetical_replica_count: current_replica_count + 1,
5243 });
5244 }
5245
5246 let config = plan_replica_config(scx, options)?;
5247
5248 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5249 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5250 return Err(PlanError::MangedReplicaName(name.into_string()));
5251 }
5252 } else {
5253 ensure_cluster_is_not_managed(scx, cluster.id())?;
5254 }
5255
5256 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5257 name: normalize::ident(name),
5258 cluster_id: cluster.id(),
5259 config,
5260 }))
5261}
5262
5263pub fn describe_create_secret(
5264 _: &StatementContext,
5265 _: CreateSecretStatement<Aug>,
5266) -> Result<StatementDesc, PlanError> {
5267 Ok(StatementDesc::new(None))
5268}
5269
5270pub fn plan_create_secret(
5271 scx: &StatementContext,
5272 stmt: CreateSecretStatement<Aug>,
5273) -> Result<Plan, PlanError> {
5274 let CreateSecretStatement {
5275 name,
5276 if_not_exists,
5277 value,
5278 } = &stmt;
5279
5280 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5281 let mut create_sql_statement = stmt.clone();
5282 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5283 let create_sql =
5284 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5285 let secret_as = query::plan_secret_as(scx, value.clone())?;
5286
5287 let secret = Secret {
5288 create_sql,
5289 secret_as,
5290 };
5291
5292 Ok(Plan::CreateSecret(CreateSecretPlan {
5293 name,
5294 secret,
5295 if_not_exists: *if_not_exists,
5296 }))
5297}
5298
5299pub fn describe_create_connection(
5300 _: &StatementContext,
5301 _: CreateConnectionStatement<Aug>,
5302) -> Result<StatementDesc, PlanError> {
5303 Ok(StatementDesc::new(None))
5304}
5305
5306generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5307
5308pub fn plan_create_connection(
5309 scx: &StatementContext,
5310 mut stmt: CreateConnectionStatement<Aug>,
5311) -> Result<Plan, PlanError> {
5312 let CreateConnectionStatement {
5313 name,
5314 connection_type,
5315 values,
5316 if_not_exists,
5317 with_options,
5318 } = stmt.clone();
5319 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5320 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5321 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5322
5323 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5324 if options.validate.is_some() {
5325 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5326 }
5327 let validate = match options.validate {
5328 Some(val) => val,
5329 None => {
5330 scx.catalog
5331 .system_vars()
5332 .enable_default_connection_validation()
5333 && details.to_connection().validate_by_default()
5334 }
5335 };
5336
5337 let full_name = scx.catalog.resolve_full_name(&name);
5339 let partial_name = PartialItemName::from(full_name.clone());
5340 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5341 return Err(PlanError::ItemAlreadyExists {
5342 name: full_name.to_string(),
5343 item_type: item.item_type(),
5344 });
5345 }
5346
5347 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5350 stmt.values.retain(|v| {
5351 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5352 });
5353 stmt.values.push(ConnectionOption {
5354 name: ConnectionOptionName::PublicKey1,
5355 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5356 });
5357 stmt.values.push(ConnectionOption {
5358 name: ConnectionOptionName::PublicKey2,
5359 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5360 });
5361 }
5362 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5363
5364 let plan = CreateConnectionPlan {
5365 name,
5366 if_not_exists,
5367 connection: crate::plan::Connection {
5368 create_sql,
5369 details,
5370 },
5371 validate,
5372 };
5373 Ok(Plan::CreateConnection(plan))
5374}
5375
5376fn plan_drop_database(
5377 scx: &StatementContext,
5378 if_exists: bool,
5379 name: &UnresolvedDatabaseName,
5380 cascade: bool,
5381) -> Result<Option<DatabaseId>, PlanError> {
5382 Ok(match resolve_database(scx, name, if_exists)? {
5383 Some(database) => {
5384 if !cascade && database.has_schemas() {
5385 sql_bail!(
5386 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5387 name,
5388 );
5389 }
5390 Some(database.id())
5391 }
5392 None => None,
5393 })
5394}
5395
5396pub fn describe_drop_objects(
5397 _: &StatementContext,
5398 _: DropObjectsStatement,
5399) -> Result<StatementDesc, PlanError> {
5400 Ok(StatementDesc::new(None))
5401}
5402
5403pub fn plan_drop_objects(
5404 scx: &mut StatementContext,
5405 DropObjectsStatement {
5406 object_type,
5407 if_exists,
5408 names,
5409 cascade,
5410 }: DropObjectsStatement,
5411) -> Result<Plan, PlanError> {
5412 assert_ne!(
5413 object_type,
5414 mz_sql_parser::ast::ObjectType::Func,
5415 "rejected in parser"
5416 );
5417 let object_type = object_type.into();
5418
5419 let mut referenced_ids = Vec::new();
5420 for name in names {
5421 let id = match &name {
5422 UnresolvedObjectName::Cluster(name) => {
5423 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5424 }
5425 UnresolvedObjectName::ClusterReplica(name) => {
5426 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5427 }
5428 UnresolvedObjectName::Database(name) => {
5429 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5430 }
5431 UnresolvedObjectName::Schema(name) => {
5432 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5433 }
5434 UnresolvedObjectName::Role(name) => {
5435 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5436 }
5437 UnresolvedObjectName::Item(name) => {
5438 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5439 .map(ObjectId::Item)
5440 }
5441 UnresolvedObjectName::NetworkPolicy(name) => {
5442 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5443 }
5444 };
5445 match id {
5446 Some(id) => referenced_ids.push(id),
5447 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5448 name: name.to_ast_string_simple(),
5449 object_type,
5450 }),
5451 }
5452 }
5453 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5454
5455 Ok(Plan::DropObjects(DropObjectsPlan {
5456 referenced_ids,
5457 drop_ids,
5458 object_type,
5459 }))
5460}
5461
5462fn plan_drop_schema(
5463 scx: &StatementContext,
5464 if_exists: bool,
5465 name: &UnresolvedSchemaName,
5466 cascade: bool,
5467) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5468 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5469 Some((database_spec, schema_spec)) => {
5470 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5471 sql_bail!(
5472 "cannot drop schema {name} because it is required by the database system",
5473 );
5474 }
5475 if let SchemaSpecifier::Temporary = schema_spec {
5476 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5477 }
5478 let schema = scx.get_schema(&database_spec, &schema_spec);
5479 if !cascade && schema.has_items() {
5480 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5481 sql_bail!(
5482 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5483 full_schema_name
5484 );
5485 }
5486 Some((database_spec, schema_spec))
5487 }
5488 None => None,
5489 })
5490}
5491
5492fn plan_drop_role(
5493 scx: &StatementContext,
5494 if_exists: bool,
5495 name: &Ident,
5496) -> Result<Option<RoleId>, PlanError> {
5497 match scx.catalog.resolve_role(name.as_str()) {
5498 Ok(role) => {
5499 let id = role.id();
5500 if &id == scx.catalog.active_role_id() {
5501 sql_bail!("current role cannot be dropped");
5502 }
5503 for role in scx.catalog.get_roles() {
5504 for (member_id, grantor_id) in role.membership() {
5505 if &id == grantor_id {
5506 let member_role = scx.catalog.get_role(member_id);
5507 sql_bail!(
5508 "cannot drop role {}: still depended up by membership of role {} in role {}",
5509 name.as_str(),
5510 role.name(),
5511 member_role.name()
5512 );
5513 }
5514 }
5515 }
5516 Ok(Some(role.id()))
5517 }
5518 Err(_) if if_exists => Ok(None),
5519 Err(e) => Err(e.into()),
5520 }
5521}
5522
5523fn plan_drop_cluster(
5524 scx: &StatementContext,
5525 if_exists: bool,
5526 name: &Ident,
5527 cascade: bool,
5528) -> Result<Option<ClusterId>, PlanError> {
5529 Ok(match resolve_cluster(scx, name, if_exists)? {
5530 Some(cluster) => {
5531 if !cascade && !cluster.bound_objects().is_empty() {
5532 return Err(PlanError::DependentObjectsStillExist {
5533 object_type: "cluster".to_string(),
5534 object_name: cluster.name().to_string(),
5535 dependents: Vec::new(),
5536 });
5537 }
5538 Some(cluster.id())
5539 }
5540 None => None,
5541 })
5542}
5543
5544fn plan_drop_network_policy(
5545 scx: &StatementContext,
5546 if_exists: bool,
5547 name: &Ident,
5548) -> Result<Option<NetworkPolicyId>, PlanError> {
5549 match scx.catalog.resolve_network_policy(name.as_str()) {
5550 Ok(policy) => {
5551 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5554 Err(PlanError::NetworkPolicyInUse)
5555 } else {
5556 Ok(Some(policy.id()))
5557 }
5558 }
5559 Err(_) if if_exists => Ok(None),
5560 Err(e) => Err(e.into()),
5561 }
5562}
5563
5564fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5567 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5569 false
5570 } else {
5571 cluster.bound_objects().iter().any(|id| {
5573 let item = scx.catalog.get_item(id);
5574 matches!(
5575 item.item_type(),
5576 CatalogItemType::Sink | CatalogItemType::Source
5577 )
5578 })
5579 }
5580}
5581
5582fn plan_drop_cluster_replica(
5583 scx: &StatementContext,
5584 if_exists: bool,
5585 name: &QualifiedReplica,
5586) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5587 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5588 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5589}
5590
5591fn plan_drop_item(
5593 scx: &StatementContext,
5594 object_type: ObjectType,
5595 if_exists: bool,
5596 name: UnresolvedItemName,
5597 cascade: bool,
5598) -> Result<Option<CatalogItemId>, PlanError> {
5599 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5600 Ok(r) => r,
5601 Err(PlanError::MismatchedObjectType {
5603 name,
5604 is_type: ObjectType::MaterializedView,
5605 expected_type: ObjectType::View,
5606 }) => {
5607 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5608 }
5609 e => e?,
5610 };
5611
5612 Ok(match resolved {
5613 Some(catalog_item) => {
5614 if catalog_item.id().is_system() {
5615 sql_bail!(
5616 "cannot drop {} {} because it is required by the database system",
5617 catalog_item.item_type(),
5618 scx.catalog.minimal_qualification(catalog_item.name()),
5619 );
5620 }
5621
5622 if !cascade {
5623 for id in catalog_item.used_by() {
5624 let dep = scx.catalog.get_item(id);
5625 if dependency_prevents_drop(object_type, dep) {
5626 return Err(PlanError::DependentObjectsStillExist {
5627 object_type: catalog_item.item_type().to_string(),
5628 object_name: scx
5629 .catalog
5630 .minimal_qualification(catalog_item.name())
5631 .to_string(),
5632 dependents: vec![(
5633 dep.item_type().to_string(),
5634 scx.catalog.minimal_qualification(dep.name()).to_string(),
5635 )],
5636 });
5637 }
5638 }
5639 }
5642 Some(catalog_item.id())
5643 }
5644 None => None,
5645 })
5646}
5647
5648fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5650 match object_type {
5651 ObjectType::Type => true,
5652 _ => match dep.item_type() {
5653 CatalogItemType::Func
5654 | CatalogItemType::Table
5655 | CatalogItemType::Source
5656 | CatalogItemType::View
5657 | CatalogItemType::MaterializedView
5658 | CatalogItemType::Sink
5659 | CatalogItemType::Type
5660 | CatalogItemType::Secret
5661 | CatalogItemType::Connection
5662 | CatalogItemType::ContinualTask => true,
5663 CatalogItemType::Index => false,
5664 },
5665 }
5666}
5667
5668pub fn describe_alter_index_options(
5669 _: &StatementContext,
5670 _: AlterIndexStatement<Aug>,
5671) -> Result<StatementDesc, PlanError> {
5672 Ok(StatementDesc::new(None))
5673}
5674
5675pub fn describe_drop_owned(
5676 _: &StatementContext,
5677 _: DropOwnedStatement<Aug>,
5678) -> Result<StatementDesc, PlanError> {
5679 Ok(StatementDesc::new(None))
5680}
5681
5682pub fn plan_drop_owned(
5683 scx: &StatementContext,
5684 drop: DropOwnedStatement<Aug>,
5685) -> Result<Plan, PlanError> {
5686 let cascade = drop.cascade();
5687 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5688 let mut drop_ids = Vec::new();
5689 let mut privilege_revokes = Vec::new();
5690 let mut default_privilege_revokes = Vec::new();
5691
5692 fn update_privilege_revokes(
5693 object_id: SystemObjectId,
5694 privileges: &PrivilegeMap,
5695 role_ids: &BTreeSet<RoleId>,
5696 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5697 ) {
5698 privilege_revokes.extend(iter::zip(
5699 iter::repeat(object_id),
5700 privileges
5701 .all_values()
5702 .filter(|privilege| role_ids.contains(&privilege.grantee))
5703 .cloned(),
5704 ));
5705 }
5706
5707 for replica in scx.catalog.get_cluster_replicas() {
5709 if role_ids.contains(&replica.owner_id()) {
5710 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5711 }
5712 }
5713
5714 for cluster in scx.catalog.get_clusters() {
5716 if role_ids.contains(&cluster.owner_id()) {
5717 if !cascade {
5719 let non_owned_bound_objects: Vec<_> = cluster
5720 .bound_objects()
5721 .into_iter()
5722 .map(|item_id| scx.catalog.get_item(item_id))
5723 .filter(|item| !role_ids.contains(&item.owner_id()))
5724 .collect();
5725 if !non_owned_bound_objects.is_empty() {
5726 let names: Vec<_> = non_owned_bound_objects
5727 .into_iter()
5728 .map(|item| {
5729 (
5730 item.item_type().to_string(),
5731 scx.catalog.resolve_full_name(item.name()).to_string(),
5732 )
5733 })
5734 .collect();
5735 return Err(PlanError::DependentObjectsStillExist {
5736 object_type: "cluster".to_string(),
5737 object_name: cluster.name().to_string(),
5738 dependents: names,
5739 });
5740 }
5741 }
5742 drop_ids.push(cluster.id().into());
5743 }
5744 update_privilege_revokes(
5745 SystemObjectId::Object(cluster.id().into()),
5746 cluster.privileges(),
5747 &role_ids,
5748 &mut privilege_revokes,
5749 );
5750 }
5751
5752 for item in scx.catalog.get_items() {
5754 if role_ids.contains(&item.owner_id()) {
5755 if !cascade {
5756 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5758 let non_owned_dependencies: Vec<_> = used_by
5759 .into_iter()
5760 .map(|item_id| scx.catalog.get_item(item_id))
5761 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5762 .filter(|item| !role_ids.contains(&item.owner_id()))
5763 .collect();
5764 if !non_owned_dependencies.is_empty() {
5765 let names: Vec<_> = non_owned_dependencies
5766 .into_iter()
5767 .map(|item| {
5768 let item_typ = item.item_type().to_string();
5769 let item_name =
5770 scx.catalog.resolve_full_name(item.name()).to_string();
5771 (item_typ, item_name)
5772 })
5773 .collect();
5774 Err(PlanError::DependentObjectsStillExist {
5775 object_type: item.item_type().to_string(),
5776 object_name: scx
5777 .catalog
5778 .resolve_full_name(item.name())
5779 .to_string()
5780 .to_string(),
5781 dependents: names,
5782 })
5783 } else {
5784 Ok(())
5785 }
5786 };
5787
5788 if let Some(id) = item.progress_id() {
5791 let progress_item = scx.catalog.get_item(&id);
5792 check_if_dependents_exist(progress_item.used_by())?;
5793 }
5794 check_if_dependents_exist(item.used_by())?;
5795 }
5796 drop_ids.push(item.id().into());
5797 }
5798 update_privilege_revokes(
5799 SystemObjectId::Object(item.id().into()),
5800 item.privileges(),
5801 &role_ids,
5802 &mut privilege_revokes,
5803 );
5804 }
5805
5806 for schema in scx.catalog.get_schemas() {
5808 if !schema.id().is_temporary() {
5809 if role_ids.contains(&schema.owner_id()) {
5810 if !cascade {
5811 let non_owned_dependencies: Vec<_> = schema
5812 .item_ids()
5813 .map(|item_id| scx.catalog.get_item(&item_id))
5814 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5815 .filter(|item| !role_ids.contains(&item.owner_id()))
5816 .collect();
5817 if !non_owned_dependencies.is_empty() {
5818 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5819 sql_bail!(
5820 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5821 full_schema_name.to_string().quoted()
5822 );
5823 }
5824 }
5825 drop_ids.push((*schema.database(), *schema.id()).into())
5826 }
5827 update_privilege_revokes(
5828 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5829 schema.privileges(),
5830 &role_ids,
5831 &mut privilege_revokes,
5832 );
5833 }
5834 }
5835
5836 for database in scx.catalog.get_databases() {
5838 if role_ids.contains(&database.owner_id()) {
5839 if !cascade {
5840 let non_owned_schemas: Vec<_> = database
5841 .schemas()
5842 .into_iter()
5843 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5844 .collect();
5845 if !non_owned_schemas.is_empty() {
5846 sql_bail!(
5847 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5848 database.name().quoted(),
5849 );
5850 }
5851 }
5852 drop_ids.push(database.id().into());
5853 }
5854 update_privilege_revokes(
5855 SystemObjectId::Object(database.id().into()),
5856 database.privileges(),
5857 &role_ids,
5858 &mut privilege_revokes,
5859 );
5860 }
5861
5862 update_privilege_revokes(
5864 SystemObjectId::System,
5865 scx.catalog.get_system_privileges(),
5866 &role_ids,
5867 &mut privilege_revokes,
5868 );
5869
5870 for (default_privilege_object, default_privilege_acl_items) in
5871 scx.catalog.get_default_privileges()
5872 {
5873 for default_privilege_acl_item in default_privilege_acl_items {
5874 if role_ids.contains(&default_privilege_object.role_id)
5875 || role_ids.contains(&default_privilege_acl_item.grantee)
5876 {
5877 default_privilege_revokes.push((
5878 default_privilege_object.clone(),
5879 default_privilege_acl_item.clone(),
5880 ));
5881 }
5882 }
5883 }
5884
5885 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5886
5887 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5888 if !system_ids.is_empty() {
5889 let mut owners = system_ids
5890 .into_iter()
5891 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5892 .collect::<BTreeSet<_>>()
5893 .into_iter()
5894 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5895 sql_bail!(
5896 "cannot drop objects owned by role {} because they are required by the database system",
5897 owners.join(", "),
5898 );
5899 }
5900
5901 Ok(Plan::DropOwned(DropOwnedPlan {
5902 role_ids: role_ids.into_iter().collect(),
5903 drop_ids,
5904 privilege_revokes,
5905 default_privilege_revokes,
5906 }))
5907}
5908
5909fn plan_retain_history_option(
5910 scx: &StatementContext,
5911 retain_history: Option<OptionalDuration>,
5912) -> Result<Option<CompactionWindow>, PlanError> {
5913 if let Some(OptionalDuration(lcw)) = retain_history {
5914 Ok(Some(plan_retain_history(scx, lcw)?))
5915 } else {
5916 Ok(None)
5917 }
5918}
5919
5920fn plan_retain_history(
5926 scx: &StatementContext,
5927 lcw: Option<Duration>,
5928) -> Result<CompactionWindow, PlanError> {
5929 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5930 match lcw {
5931 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5936 option_name: "RETAIN HISTORY".to_string(),
5937 err: Box::new(PlanError::Unstructured(
5938 "internal error: unexpectedly zero".to_string(),
5939 )),
5940 }),
5941 Some(duration) => {
5942 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5945 && scx
5946 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5947 .is_err()
5948 {
5949 return Err(PlanError::RetainHistoryLow {
5950 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5951 });
5952 }
5953 Ok(duration.try_into()?)
5954 }
5955 None => {
5958 if scx
5959 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5960 .is_err()
5961 {
5962 Err(PlanError::RetainHistoryRequired)
5963 } else {
5964 Ok(CompactionWindow::DisableCompaction)
5965 }
5966 }
5967 }
5968}
5969
5970generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5971
5972fn plan_index_options(
5973 scx: &StatementContext,
5974 with_opts: Vec<IndexOption<Aug>>,
5975) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5976 if !with_opts.is_empty() {
5977 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5979 }
5980
5981 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5982
5983 let mut out = Vec::with_capacity(1);
5984 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5985 out.push(crate::plan::IndexOption::RetainHistory(cw));
5986 }
5987 Ok(out)
5988}
5989
5990generate_extracted_config!(
5991 TableOption,
5992 (PartitionBy, Vec<Ident>),
5993 (RetainHistory, OptionalDuration),
5994 (RedactedTest, String)
5995);
5996
5997fn plan_table_options(
5998 scx: &StatementContext,
5999 desc: &RelationDesc,
6000 with_opts: Vec<TableOption<Aug>>,
6001) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6002 let TableOptionExtracted {
6003 partition_by,
6004 retain_history,
6005 redacted_test,
6006 ..
6007 }: TableOptionExtracted = with_opts.try_into()?;
6008
6009 if let Some(partition_by) = partition_by {
6010 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6011 check_partition_by(desc, partition_by)?;
6012 }
6013
6014 if redacted_test.is_some() {
6015 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6016 }
6017
6018 let mut out = Vec::with_capacity(1);
6019 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6020 out.push(crate::plan::TableOption::RetainHistory(cw));
6021 }
6022 Ok(out)
6023}
6024
6025pub fn plan_alter_index_options(
6026 scx: &mut StatementContext,
6027 AlterIndexStatement {
6028 index_name,
6029 if_exists,
6030 action,
6031 }: AlterIndexStatement<Aug>,
6032) -> Result<Plan, PlanError> {
6033 let object_type = ObjectType::Index;
6034 match action {
6035 AlterIndexAction::ResetOptions(options) => {
6036 let mut options = options.into_iter();
6037 if let Some(opt) = options.next() {
6038 match opt {
6039 IndexOptionName::RetainHistory => {
6040 if options.next().is_some() {
6041 sql_bail!("RETAIN HISTORY must be only option");
6042 }
6043 return alter_retain_history(
6044 scx,
6045 object_type,
6046 if_exists,
6047 UnresolvedObjectName::Item(index_name),
6048 None,
6049 );
6050 }
6051 }
6052 }
6053 sql_bail!("expected option");
6054 }
6055 AlterIndexAction::SetOptions(options) => {
6056 let mut options = options.into_iter();
6057 if let Some(opt) = options.next() {
6058 match opt.name {
6059 IndexOptionName::RetainHistory => {
6060 if options.next().is_some() {
6061 sql_bail!("RETAIN HISTORY must be only option");
6062 }
6063 return alter_retain_history(
6064 scx,
6065 object_type,
6066 if_exists,
6067 UnresolvedObjectName::Item(index_name),
6068 opt.value,
6069 );
6070 }
6071 }
6072 }
6073 sql_bail!("expected option");
6074 }
6075 }
6076}
6077
6078pub fn describe_alter_cluster_set_options(
6079 _: &StatementContext,
6080 _: AlterClusterStatement<Aug>,
6081) -> Result<StatementDesc, PlanError> {
6082 Ok(StatementDesc::new(None))
6083}
6084
6085pub fn plan_alter_cluster(
6086 scx: &mut StatementContext,
6087 AlterClusterStatement {
6088 name,
6089 action,
6090 if_exists,
6091 }: AlterClusterStatement<Aug>,
6092) -> Result<Plan, PlanError> {
6093 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6094 Some(entry) => entry,
6095 None => {
6096 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6097 name: name.to_ast_string_simple(),
6098 object_type: ObjectType::Cluster,
6099 });
6100
6101 return Ok(Plan::AlterNoop(AlterNoopPlan {
6102 object_type: ObjectType::Cluster,
6103 }));
6104 }
6105 };
6106
6107 let mut options: PlanClusterOption = Default::default();
6108 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6109
6110 match action {
6111 AlterClusterAction::SetOptions {
6112 options: set_options,
6113 with_options,
6114 } => {
6115 let ClusterOptionExtracted {
6116 availability_zones,
6117 introspection_debugging,
6118 introspection_interval,
6119 managed,
6120 replicas: replica_defs,
6121 replication_factor,
6122 seen: _,
6123 size,
6124 disk,
6125 schedule,
6126 workload_class,
6127 }: ClusterOptionExtracted = set_options.try_into()?;
6128
6129 if !scx.catalog.active_role_id().is_system() {
6130 if workload_class.is_some() {
6131 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6132 }
6133 }
6134
6135 match managed.unwrap_or_else(|| cluster.is_managed()) {
6136 true => {
6137 let alter_strategy_extracted =
6138 ClusterAlterOptionExtracted::try_from(with_options)?;
6139 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6140
6141 match alter_strategy {
6142 AlterClusterPlanStrategy::None => {}
6143 _ => {
6144 scx.require_feature_flag(
6145 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6146 )?;
6147 }
6148 }
6149
6150 if replica_defs.is_some() {
6151 sql_bail!("REPLICAS not supported for managed clusters");
6152 }
6153 if schedule.is_some()
6154 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6155 {
6156 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6157 }
6158
6159 if let Some(replication_factor) = replication_factor {
6160 if schedule.is_some()
6161 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6162 {
6163 sql_bail!(
6164 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6165 );
6166 }
6167 if let Some(current_schedule) = cluster.schedule() {
6168 if !matches!(current_schedule, ClusterSchedule::Manual) {
6169 sql_bail!(
6170 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6171 );
6172 }
6173 }
6174
6175 let internal_replica_count =
6176 cluster.replicas().iter().filter(|r| r.internal()).count();
6177 let hypothetical_replica_count =
6178 internal_replica_count + usize::cast_from(replication_factor);
6179
6180 if contains_single_replica_objects(scx, cluster)
6183 && hypothetical_replica_count > 1
6184 {
6185 return Err(PlanError::CreateReplicaFailStorageObjects {
6186 current_replica_count: cluster.replica_ids().iter().count(),
6187 internal_replica_count,
6188 hypothetical_replica_count,
6189 });
6190 }
6191 } else if alter_strategy.is_some() {
6192 let internal_replica_count =
6196 cluster.replicas().iter().filter(|r| r.internal()).count();
6197 let hypothetical_replica_count = internal_replica_count * 2;
6198 if contains_single_replica_objects(scx, cluster) {
6199 return Err(PlanError::CreateReplicaFailStorageObjects {
6200 current_replica_count: cluster.replica_ids().iter().count(),
6201 internal_replica_count,
6202 hypothetical_replica_count,
6203 });
6204 }
6205 }
6206 }
6207 false => {
6208 if !alter_strategy.is_none() {
6209 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6210 }
6211 if availability_zones.is_some() {
6212 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6213 }
6214 if replication_factor.is_some() {
6215 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6216 }
6217 if introspection_debugging.is_some() {
6218 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6219 }
6220 if introspection_interval.is_some() {
6221 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6222 }
6223 if size.is_some() {
6224 sql_bail!("SIZE not supported for unmanaged clusters");
6225 }
6226 if disk.is_some() {
6227 sql_bail!("DISK not supported for unmanaged clusters");
6228 }
6229 if schedule.is_some()
6230 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6231 {
6232 sql_bail!(
6233 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6234 );
6235 }
6236 if let Some(current_schedule) = cluster.schedule() {
6237 if !matches!(current_schedule, ClusterSchedule::Manual)
6238 && schedule.is_none()
6239 {
6240 sql_bail!(
6241 "when switching a cluster to unmanaged, if the managed \
6242 cluster's SCHEDULE is anything other than MANUAL, you have to \
6243 explicitly set the SCHEDULE to MANUAL"
6244 );
6245 }
6246 }
6247 }
6248 }
6249
6250 let mut replicas = vec![];
6251 for ReplicaDefinition { name, options } in
6252 replica_defs.into_iter().flat_map(Vec::into_iter)
6253 {
6254 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6255 }
6256
6257 if let Some(managed) = managed {
6258 options.managed = AlterOptionParameter::Set(managed);
6259 }
6260 if let Some(replication_factor) = replication_factor {
6261 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6262 }
6263 if let Some(size) = &size {
6264 options.size = AlterOptionParameter::Set(size.clone());
6265 }
6266 if let Some(availability_zones) = availability_zones {
6267 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6268 }
6269 if let Some(introspection_debugging) = introspection_debugging {
6270 options.introspection_debugging =
6271 AlterOptionParameter::Set(introspection_debugging);
6272 }
6273 if let Some(introspection_interval) = introspection_interval {
6274 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6275 }
6276 if disk.is_some() {
6277 let size = size.as_deref().unwrap_or_else(|| {
6281 cluster.managed_size().expect("cluster known to be managed")
6282 });
6283 if scx.catalog.is_cluster_size_cc(size) {
6284 sql_bail!(
6285 "DISK option not supported for modern cluster sizes because disk is always enabled"
6286 );
6287 }
6288
6289 scx.catalog
6290 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6291 }
6292 if !replicas.is_empty() {
6293 options.replicas = AlterOptionParameter::Set(replicas);
6294 }
6295 if let Some(schedule) = schedule {
6296 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6297 }
6298 if let Some(workload_class) = workload_class {
6299 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6300 }
6301 }
6302 AlterClusterAction::ResetOptions(reset_options) => {
6303 use AlterOptionParameter::Reset;
6304 use ClusterOptionName::*;
6305
6306 if !scx.catalog.active_role_id().is_system() {
6307 if reset_options.contains(&WorkloadClass) {
6308 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6309 }
6310 }
6311
6312 for option in reset_options {
6313 match option {
6314 AvailabilityZones => options.availability_zones = Reset,
6315 Disk => scx
6316 .catalog
6317 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6318 IntrospectionInterval => options.introspection_interval = Reset,
6319 IntrospectionDebugging => options.introspection_debugging = Reset,
6320 Managed => options.managed = Reset,
6321 Replicas => options.replicas = Reset,
6322 ReplicationFactor => options.replication_factor = Reset,
6323 Size => options.size = Reset,
6324 Schedule => options.schedule = Reset,
6325 WorkloadClass => options.workload_class = Reset,
6326 }
6327 }
6328 }
6329 }
6330 Ok(Plan::AlterCluster(AlterClusterPlan {
6331 id: cluster.id(),
6332 name: cluster.name().to_string(),
6333 options,
6334 strategy: alter_strategy,
6335 }))
6336}
6337
6338pub fn describe_alter_set_cluster(
6339 _: &StatementContext,
6340 _: AlterSetClusterStatement<Aug>,
6341) -> Result<StatementDesc, PlanError> {
6342 Ok(StatementDesc::new(None))
6343}
6344
6345pub fn plan_alter_item_set_cluster(
6346 scx: &StatementContext,
6347 AlterSetClusterStatement {
6348 if_exists,
6349 set_cluster: in_cluster_name,
6350 name,
6351 object_type,
6352 }: AlterSetClusterStatement<Aug>,
6353) -> Result<Plan, PlanError> {
6354 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6355
6356 let object_type = object_type.into();
6357
6358 match object_type {
6360 ObjectType::MaterializedView => {}
6361 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6362 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6363 }
6364 _ => {
6365 bail_never_supported!(
6366 format!("ALTER {object_type} SET CLUSTER"),
6367 "sql/alter-set-cluster/",
6368 format!("{object_type} has no associated cluster")
6369 )
6370 }
6371 }
6372
6373 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6374
6375 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6376 Some(entry) => {
6377 let current_cluster = entry.cluster_id();
6378 let Some(current_cluster) = current_cluster else {
6379 sql_bail!("No cluster associated with {name}");
6380 };
6381
6382 if current_cluster == in_cluster.id() {
6383 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6384 } else {
6385 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6386 id: entry.id(),
6387 set_cluster: in_cluster.id(),
6388 }))
6389 }
6390 }
6391 None => {
6392 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6393 name: name.to_ast_string_simple(),
6394 object_type,
6395 });
6396
6397 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6398 }
6399 }
6400}
6401
6402pub fn describe_alter_object_rename(
6403 _: &StatementContext,
6404 _: AlterObjectRenameStatement,
6405) -> Result<StatementDesc, PlanError> {
6406 Ok(StatementDesc::new(None))
6407}
6408
6409pub fn plan_alter_object_rename(
6410 scx: &mut StatementContext,
6411 AlterObjectRenameStatement {
6412 name,
6413 object_type,
6414 to_item_name,
6415 if_exists,
6416 }: AlterObjectRenameStatement,
6417) -> Result<Plan, PlanError> {
6418 let object_type = object_type.into();
6419 match (object_type, name) {
6420 (
6421 ObjectType::View
6422 | ObjectType::MaterializedView
6423 | ObjectType::Table
6424 | ObjectType::Source
6425 | ObjectType::Index
6426 | ObjectType::Sink
6427 | ObjectType::Secret
6428 | ObjectType::Connection,
6429 UnresolvedObjectName::Item(name),
6430 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6431 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6432 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6433 }
6434 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6435 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6436 }
6437 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6438 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6439 }
6440 (object_type, name) => {
6441 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6442 }
6443 }
6444}
6445
6446pub fn plan_alter_schema_rename(
6447 scx: &mut StatementContext,
6448 name: UnresolvedSchemaName,
6449 to_schema_name: Ident,
6450 if_exists: bool,
6451) -> Result<Plan, PlanError> {
6452 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6453 let object_type = ObjectType::Schema;
6454 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6455 name: name.to_ast_string_simple(),
6456 object_type,
6457 });
6458 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6459 };
6460
6461 if scx
6463 .resolve_schema_in_database(&db_spec, &to_schema_name)
6464 .is_ok()
6465 {
6466 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6467 to_schema_name.clone().into_string(),
6468 )));
6469 }
6470
6471 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6473 if schema.id().is_system() {
6474 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6475 }
6476
6477 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6478 cur_schema_spec: (db_spec, schema_spec),
6479 new_schema_name: to_schema_name.into_string(),
6480 }))
6481}
6482
6483pub fn plan_alter_schema_swap<F>(
6484 scx: &mut StatementContext,
6485 name_a: UnresolvedSchemaName,
6486 name_b: Ident,
6487 gen_temp_suffix: F,
6488) -> Result<Plan, PlanError>
6489where
6490 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6491{
6492 let schema_a = scx.resolve_schema(name_a.clone())?;
6493
6494 let db_spec = schema_a.database().clone();
6495 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6496 sql_bail!("cannot swap schemas that are in the ambient database");
6497 };
6498 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6499
6500 if schema_a.id().is_system() || schema_b.id().is_system() {
6502 bail_never_supported!("swapping a system schema".to_string())
6503 }
6504
6505 let check = |temp_suffix: &str| {
6509 let mut temp_name = ident!("mz_schema_swap_");
6510 temp_name.append_lossy(temp_suffix);
6511 scx.resolve_schema_in_database(&db_spec, &temp_name)
6512 .is_err()
6513 };
6514 let temp_suffix = gen_temp_suffix(&check)?;
6515 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6516
6517 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6518 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6519 schema_a_name: schema_a.name().schema.to_string(),
6520 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6521 schema_b_name: schema_b.name().schema.to_string(),
6522 name_temp,
6523 }))
6524}
6525
6526pub fn plan_alter_item_rename(
6527 scx: &mut StatementContext,
6528 object_type: ObjectType,
6529 name: UnresolvedItemName,
6530 to_item_name: Ident,
6531 if_exists: bool,
6532) -> Result<Plan, PlanError> {
6533 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6534 Ok(r) => r,
6535 Err(PlanError::MismatchedObjectType {
6537 name,
6538 is_type: ObjectType::MaterializedView,
6539 expected_type: ObjectType::View,
6540 }) => {
6541 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6542 }
6543 e => e?,
6544 };
6545
6546 match resolved {
6547 Some(entry) => {
6548 let full_name = scx.catalog.resolve_full_name(entry.name());
6549 let item_type = entry.item_type();
6550
6551 let proposed_name = QualifiedItemName {
6552 qualifiers: entry.name().qualifiers.clone(),
6553 item: to_item_name.clone().into_string(),
6554 };
6555
6556 let conflicting_type_exists;
6560 let conflicting_item_exists;
6561 if item_type == CatalogItemType::Type {
6562 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6563 conflicting_item_exists = scx
6564 .catalog
6565 .get_item_by_name(&proposed_name)
6566 .map(|item| item.item_type().conflicts_with_type())
6567 .unwrap_or(false);
6568 } else {
6569 conflicting_type_exists = item_type.conflicts_with_type()
6570 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6571 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6572 };
6573 if conflicting_type_exists || conflicting_item_exists {
6574 sql_bail!("catalog item '{}' already exists", to_item_name);
6575 }
6576
6577 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6578 id: entry.id(),
6579 current_full_name: full_name,
6580 to_name: normalize::ident(to_item_name),
6581 object_type,
6582 }))
6583 }
6584 None => {
6585 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6586 name: name.to_ast_string_simple(),
6587 object_type,
6588 });
6589
6590 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6591 }
6592 }
6593}
6594
6595pub fn plan_alter_cluster_rename(
6596 scx: &mut StatementContext,
6597 object_type: ObjectType,
6598 name: Ident,
6599 to_name: Ident,
6600 if_exists: bool,
6601) -> Result<Plan, PlanError> {
6602 match resolve_cluster(scx, &name, if_exists)? {
6603 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6604 id: entry.id(),
6605 name: entry.name().to_string(),
6606 to_name: ident(to_name),
6607 })),
6608 None => {
6609 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6610 name: name.to_ast_string_simple(),
6611 object_type,
6612 });
6613
6614 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6615 }
6616 }
6617}
6618
6619pub fn plan_alter_cluster_swap<F>(
6620 scx: &mut StatementContext,
6621 name_a: Ident,
6622 name_b: Ident,
6623 gen_temp_suffix: F,
6624) -> Result<Plan, PlanError>
6625where
6626 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6627{
6628 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6629 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6630
6631 let check = |temp_suffix: &str| {
6632 let mut temp_name = ident!("mz_schema_swap_");
6633 temp_name.append_lossy(temp_suffix);
6634 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6635 Err(CatalogError::UnknownCluster(_)) => true,
6637 Ok(_) | Err(_) => false,
6639 }
6640 };
6641 let temp_suffix = gen_temp_suffix(&check)?;
6642 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6643
6644 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6645 id_a: cluster_a.id(),
6646 id_b: cluster_b.id(),
6647 name_a: name_a.into_string(),
6648 name_b: name_b.into_string(),
6649 name_temp,
6650 }))
6651}
6652
6653pub fn plan_alter_cluster_replica_rename(
6654 scx: &mut StatementContext,
6655 object_type: ObjectType,
6656 name: QualifiedReplica,
6657 to_item_name: Ident,
6658 if_exists: bool,
6659) -> Result<Plan, PlanError> {
6660 match resolve_cluster_replica(scx, &name, if_exists)? {
6661 Some((cluster, replica)) => {
6662 ensure_cluster_is_not_managed(scx, cluster.id())?;
6663 Ok(Plan::AlterClusterReplicaRename(
6664 AlterClusterReplicaRenamePlan {
6665 cluster_id: cluster.id(),
6666 replica_id: replica,
6667 name: QualifiedReplica {
6668 cluster: Ident::new(cluster.name())?,
6669 replica: name.replica,
6670 },
6671 to_name: normalize::ident(to_item_name),
6672 },
6673 ))
6674 }
6675 None => {
6676 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6677 name: name.to_ast_string_simple(),
6678 object_type,
6679 });
6680
6681 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6682 }
6683 }
6684}
6685
6686pub fn describe_alter_object_swap(
6687 _: &StatementContext,
6688 _: AlterObjectSwapStatement,
6689) -> Result<StatementDesc, PlanError> {
6690 Ok(StatementDesc::new(None))
6691}
6692
6693pub fn plan_alter_object_swap(
6694 scx: &mut StatementContext,
6695 stmt: AlterObjectSwapStatement,
6696) -> Result<Plan, PlanError> {
6697 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6698
6699 let AlterObjectSwapStatement {
6700 object_type,
6701 name_a,
6702 name_b,
6703 } = stmt;
6704 let object_type = object_type.into();
6705
6706 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6708 let mut attempts = 0;
6709 let name_temp = loop {
6710 attempts += 1;
6711 if attempts > 10 {
6712 tracing::warn!("Unable to generate temp id for swapping");
6713 sql_bail!("unable to swap!");
6714 }
6715
6716 let short_id = mz_ore::id_gen::temp_id();
6718 if check_fn(&short_id) {
6719 break short_id;
6720 }
6721 };
6722
6723 Ok(name_temp)
6724 };
6725
6726 match (object_type, name_a, name_b) {
6727 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6728 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6729 }
6730 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6731 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6732 }
6733 (object_type, _, _) => Err(PlanError::Unsupported {
6734 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6735 discussion_no: None,
6736 }),
6737 }
6738}
6739
6740pub fn describe_alter_retain_history(
6741 _: &StatementContext,
6742 _: AlterRetainHistoryStatement<Aug>,
6743) -> Result<StatementDesc, PlanError> {
6744 Ok(StatementDesc::new(None))
6745}
6746
6747pub fn plan_alter_retain_history(
6748 scx: &StatementContext,
6749 AlterRetainHistoryStatement {
6750 object_type,
6751 if_exists,
6752 name,
6753 history,
6754 }: AlterRetainHistoryStatement<Aug>,
6755) -> Result<Plan, PlanError> {
6756 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6757}
6758
6759fn alter_retain_history(
6760 scx: &StatementContext,
6761 object_type: ObjectType,
6762 if_exists: bool,
6763 name: UnresolvedObjectName,
6764 history: Option<WithOptionValue<Aug>>,
6765) -> Result<Plan, PlanError> {
6766 let name = match (object_type, name) {
6767 (
6768 ObjectType::View
6770 | ObjectType::MaterializedView
6771 | ObjectType::Table
6772 | ObjectType::Source
6773 | ObjectType::Index,
6774 UnresolvedObjectName::Item(name),
6775 ) => name,
6776 (object_type, _) => {
6777 sql_bail!("{object_type} does not support RETAIN HISTORY")
6778 }
6779 };
6780 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6781 Some(entry) => {
6782 let full_name = scx.catalog.resolve_full_name(entry.name());
6783 let item_type = entry.item_type();
6784
6785 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6787 return Err(PlanError::AlterViewOnMaterializedView(
6788 full_name.to_string(),
6789 ));
6790 } else if object_type == ObjectType::View {
6791 sql_bail!("{object_type} does not support RETAIN HISTORY")
6792 } else if object_type != item_type {
6793 sql_bail!(
6794 "\"{}\" is a {} not a {}",
6795 full_name,
6796 entry.item_type(),
6797 format!("{object_type}").to_lowercase()
6798 )
6799 }
6800
6801 let (value, lcw) = match &history {
6803 Some(WithOptionValue::RetainHistoryFor(value)) => {
6804 let window = OptionalDuration::try_from_value(value.clone())?;
6805 (Some(value.clone()), window.0)
6806 }
6807 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6809 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6810 };
6811 let window = plan_retain_history(scx, lcw)?;
6812
6813 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6814 id: entry.id(),
6815 value,
6816 window,
6817 object_type,
6818 }))
6819 }
6820 None => {
6821 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6822 name: name.to_ast_string_simple(),
6823 object_type,
6824 });
6825
6826 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6827 }
6828 }
6829}
6830
6831pub fn describe_alter_secret_options(
6832 _: &StatementContext,
6833 _: AlterSecretStatement<Aug>,
6834) -> Result<StatementDesc, PlanError> {
6835 Ok(StatementDesc::new(None))
6836}
6837
6838pub fn plan_alter_secret(
6839 scx: &mut StatementContext,
6840 stmt: AlterSecretStatement<Aug>,
6841) -> Result<Plan, PlanError> {
6842 let AlterSecretStatement {
6843 name,
6844 if_exists,
6845 value,
6846 } = stmt;
6847 let object_type = ObjectType::Secret;
6848 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6849 Some(entry) => entry.id(),
6850 None => {
6851 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6852 name: name.to_string(),
6853 object_type,
6854 });
6855
6856 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6857 }
6858 };
6859
6860 let secret_as = query::plan_secret_as(scx, value)?;
6861
6862 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6863}
6864
6865pub fn describe_alter_connection(
6866 _: &StatementContext,
6867 _: AlterConnectionStatement<Aug>,
6868) -> Result<StatementDesc, PlanError> {
6869 Ok(StatementDesc::new(None))
6870}
6871
6872generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6873
6874pub fn plan_alter_connection(
6875 scx: &StatementContext,
6876 stmt: AlterConnectionStatement<Aug>,
6877) -> Result<Plan, PlanError> {
6878 let AlterConnectionStatement {
6879 name,
6880 if_exists,
6881 actions,
6882 with_options,
6883 } = stmt;
6884 let conn_name = normalize::unresolved_item_name(name)?;
6885 let entry = match scx.catalog.resolve_item(&conn_name) {
6886 Ok(entry) => entry,
6887 Err(_) if if_exists => {
6888 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6889 name: conn_name.to_string(),
6890 object_type: ObjectType::Sink,
6891 });
6892
6893 return Ok(Plan::AlterNoop(AlterNoopPlan {
6894 object_type: ObjectType::Connection,
6895 }));
6896 }
6897 Err(e) => return Err(e.into()),
6898 };
6899
6900 let connection = entry.connection()?;
6901
6902 if actions
6903 .iter()
6904 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6905 {
6906 if actions.len() > 1 {
6907 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6908 }
6909
6910 if !with_options.is_empty() {
6911 sql_bail!(
6912 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6913 with_options
6914 .iter()
6915 .map(|o| o.to_ast_string_simple())
6916 .join(", ")
6917 );
6918 }
6919
6920 if !matches!(connection, Connection::Ssh(_)) {
6921 sql_bail!(
6922 "{} is not an SSH connection",
6923 scx.catalog.resolve_full_name(entry.name())
6924 )
6925 }
6926
6927 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6928 id: entry.id(),
6929 action: crate::plan::AlterConnectionAction::RotateKeys,
6930 }));
6931 }
6932
6933 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6934 if options.validate.is_some() {
6935 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6936 }
6937
6938 let validate = match options.validate {
6939 Some(val) => val,
6940 None => {
6941 scx.catalog
6942 .system_vars()
6943 .enable_default_connection_validation()
6944 && connection.validate_by_default()
6945 }
6946 };
6947
6948 let connection_type = match connection {
6949 Connection::Aws(_) => CreateConnectionType::Aws,
6950 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6951 Connection::Kafka(_) => CreateConnectionType::Kafka,
6952 Connection::Csr(_) => CreateConnectionType::Csr,
6953 Connection::Postgres(_) => CreateConnectionType::Postgres,
6954 Connection::Ssh(_) => CreateConnectionType::Ssh,
6955 Connection::MySql(_) => CreateConnectionType::MySql,
6956 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6957 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6958 };
6959
6960 let specified_options: BTreeSet<_> = actions
6962 .iter()
6963 .map(|action: &AlterConnectionAction<Aug>| match action {
6964 AlterConnectionAction::SetOption(option) => option.name.clone(),
6965 AlterConnectionAction::DropOption(name) => name.clone(),
6966 AlterConnectionAction::RotateKeys => unreachable!(),
6967 })
6968 .collect();
6969
6970 for invalid in INALTERABLE_OPTIONS {
6971 if specified_options.contains(invalid) {
6972 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6973 }
6974 }
6975
6976 connection::validate_options_per_connection_type(connection_type, specified_options)?;
6977
6978 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6980 actions.into_iter().partition_map(|action| match action {
6981 AlterConnectionAction::SetOption(option) => Either::Left(option),
6982 AlterConnectionAction::DropOption(name) => Either::Right(name),
6983 AlterConnectionAction::RotateKeys => unreachable!(),
6984 });
6985
6986 let set_options: BTreeMap<_, _> = set_options_vec
6987 .clone()
6988 .into_iter()
6989 .map(|option| (option.name, option.value))
6990 .collect();
6991
6992 let connection_options_extracted =
6996 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6997
6998 let duplicates: Vec<_> = connection_options_extracted
6999 .seen
7000 .intersection(&drop_options)
7001 .collect();
7002
7003 if !duplicates.is_empty() {
7004 sql_bail!(
7005 "cannot both SET and DROP/RESET options {}",
7006 duplicates
7007 .iter()
7008 .map(|option| option.to_string())
7009 .join(", ")
7010 )
7011 }
7012
7013 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7014 let set_options_count = mutually_exclusive_options
7015 .iter()
7016 .filter(|o| set_options.contains_key(o))
7017 .count();
7018 let drop_options_count = mutually_exclusive_options
7019 .iter()
7020 .filter(|o| drop_options.contains(o))
7021 .count();
7022
7023 if set_options_count > 0 && drop_options_count > 0 {
7025 sql_bail!(
7026 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7027 connection_type,
7028 mutually_exclusive_options
7029 .iter()
7030 .map(|option| option.to_string())
7031 .join(", ")
7032 )
7033 }
7034
7035 if set_options_count > 0 || drop_options_count > 0 {
7040 drop_options.extend(mutually_exclusive_options.iter().cloned());
7041 }
7042
7043 }
7046
7047 Ok(Plan::AlterConnection(AlterConnectionPlan {
7048 id: entry.id(),
7049 action: crate::plan::AlterConnectionAction::AlterOptions {
7050 set_options,
7051 drop_options,
7052 validate,
7053 },
7054 }))
7055}
7056
7057pub fn describe_alter_sink(
7058 _: &StatementContext,
7059 _: AlterSinkStatement<Aug>,
7060) -> Result<StatementDesc, PlanError> {
7061 Ok(StatementDesc::new(None))
7062}
7063
7064pub fn plan_alter_sink(
7065 scx: &mut StatementContext,
7066 stmt: AlterSinkStatement<Aug>,
7067) -> Result<Plan, PlanError> {
7068 let AlterSinkStatement {
7069 sink_name,
7070 if_exists,
7071 action,
7072 } = stmt;
7073
7074 let object_type = ObjectType::Sink;
7075 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7076
7077 let Some(item) = item else {
7078 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7079 name: sink_name.to_string(),
7080 object_type,
7081 });
7082
7083 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7084 };
7085 let item = item.at_version(RelationVersionSelector::Latest);
7087
7088 match action {
7089 AlterSinkAction::ChangeRelation(new_from) => {
7090 let create_sql = item.create_sql();
7092 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7093 let [stmt]: [StatementParseResult; 1] = stmts
7094 .try_into()
7095 .expect("create sql of sink was not exactly one statement");
7096 let Statement::CreateSink(stmt) = stmt.ast else {
7097 unreachable!("invalid create SQL for sink item");
7098 };
7099
7100 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7102 stmt.from = new_from;
7103
7104 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7106 unreachable!("invalid plan for CREATE SINK statement");
7107 };
7108
7109 plan.sink.version += 1;
7110
7111 Ok(Plan::AlterSink(AlterSinkPlan {
7112 item_id: item.id(),
7113 global_id: item.global_id(),
7114 sink: plan.sink,
7115 with_snapshot: plan.with_snapshot,
7116 in_cluster: plan.in_cluster,
7117 }))
7118 }
7119 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7120 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7121 }
7122}
7123
7124pub fn describe_alter_source(
7125 _: &StatementContext,
7126 _: AlterSourceStatement<Aug>,
7127) -> Result<StatementDesc, PlanError> {
7128 Ok(StatementDesc::new(None))
7130}
7131
7132generate_extracted_config!(
7133 AlterSourceAddSubsourceOption,
7134 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7135 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7136 (Details, String)
7137);
7138
7139pub fn plan_alter_source(
7140 scx: &mut StatementContext,
7141 stmt: AlterSourceStatement<Aug>,
7142) -> Result<Plan, PlanError> {
7143 let AlterSourceStatement {
7144 source_name,
7145 if_exists,
7146 action,
7147 } = stmt;
7148 let object_type = ObjectType::Source;
7149
7150 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7151 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7152 name: source_name.to_string(),
7153 object_type,
7154 });
7155
7156 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7157 }
7158
7159 match action {
7160 AlterSourceAction::SetOptions(options) => {
7161 let mut options = options.into_iter();
7162 let option = options.next().unwrap();
7163 if option.name == CreateSourceOptionName::RetainHistory {
7164 if options.next().is_some() {
7165 sql_bail!("RETAIN HISTORY must be only option");
7166 }
7167 return alter_retain_history(
7168 scx,
7169 object_type,
7170 if_exists,
7171 UnresolvedObjectName::Item(source_name),
7172 option.value,
7173 );
7174 }
7175 sql_bail!(
7178 "Cannot modify the {} of a SOURCE.",
7179 option.name.to_ast_string_simple()
7180 );
7181 }
7182 AlterSourceAction::ResetOptions(reset) => {
7183 let mut options = reset.into_iter();
7184 let option = options.next().unwrap();
7185 if option == CreateSourceOptionName::RetainHistory {
7186 if options.next().is_some() {
7187 sql_bail!("RETAIN HISTORY must be only option");
7188 }
7189 return alter_retain_history(
7190 scx,
7191 object_type,
7192 if_exists,
7193 UnresolvedObjectName::Item(source_name),
7194 None,
7195 );
7196 }
7197 sql_bail!(
7198 "Cannot modify the {} of a SOURCE.",
7199 option.to_ast_string_simple()
7200 );
7201 }
7202 AlterSourceAction::DropSubsources { .. } => {
7203 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7204 }
7205 AlterSourceAction::AddSubsources { .. } => {
7206 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7207 }
7208 AlterSourceAction::RefreshReferences => {
7209 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7210 }
7211 };
7212}
7213
7214pub fn describe_alter_system_set(
7215 _: &StatementContext,
7216 _: AlterSystemSetStatement,
7217) -> Result<StatementDesc, PlanError> {
7218 Ok(StatementDesc::new(None))
7219}
7220
7221pub fn plan_alter_system_set(
7222 _: &StatementContext,
7223 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7224) -> Result<Plan, PlanError> {
7225 let name = name.to_string();
7226 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7227 name,
7228 value: scl::plan_set_variable_to(to)?,
7229 }))
7230}
7231
7232pub fn describe_alter_system_reset(
7233 _: &StatementContext,
7234 _: AlterSystemResetStatement,
7235) -> Result<StatementDesc, PlanError> {
7236 Ok(StatementDesc::new(None))
7237}
7238
7239pub fn plan_alter_system_reset(
7240 _: &StatementContext,
7241 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7242) -> Result<Plan, PlanError> {
7243 let name = name.to_string();
7244 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7245}
7246
7247pub fn describe_alter_system_reset_all(
7248 _: &StatementContext,
7249 _: AlterSystemResetAllStatement,
7250) -> Result<StatementDesc, PlanError> {
7251 Ok(StatementDesc::new(None))
7252}
7253
7254pub fn plan_alter_system_reset_all(
7255 _: &StatementContext,
7256 _: AlterSystemResetAllStatement,
7257) -> Result<Plan, PlanError> {
7258 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7259}
7260
7261pub fn describe_alter_role(
7262 _: &StatementContext,
7263 _: AlterRoleStatement<Aug>,
7264) -> Result<StatementDesc, PlanError> {
7265 Ok(StatementDesc::new(None))
7266}
7267
7268pub fn plan_alter_role(
7269 scx: &StatementContext,
7270 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7271) -> Result<Plan, PlanError> {
7272 let option = match option {
7273 AlterRoleOption::Attributes(attrs) => {
7274 let attrs = plan_role_attributes(attrs, scx)?;
7275 PlannedAlterRoleOption::Attributes(attrs)
7276 }
7277 AlterRoleOption::Variable(variable) => {
7278 let var = plan_role_variable(variable)?;
7279 PlannedAlterRoleOption::Variable(var)
7280 }
7281 };
7282
7283 Ok(Plan::AlterRole(AlterRolePlan {
7284 id: name.id,
7285 name: name.name,
7286 option,
7287 }))
7288}
7289
7290pub fn describe_alter_table_add_column(
7291 _: &StatementContext,
7292 _: AlterTableAddColumnStatement<Aug>,
7293) -> Result<StatementDesc, PlanError> {
7294 Ok(StatementDesc::new(None))
7295}
7296
7297pub fn plan_alter_table_add_column(
7298 scx: &StatementContext,
7299 stmt: AlterTableAddColumnStatement<Aug>,
7300) -> Result<Plan, PlanError> {
7301 let AlterTableAddColumnStatement {
7302 if_exists,
7303 name,
7304 if_col_not_exist,
7305 column_name,
7306 data_type,
7307 } = stmt;
7308 let object_type = ObjectType::Table;
7309
7310 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7311
7312 let (relation_id, item_name, desc) =
7313 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7314 Some(item) => {
7315 let item_name = scx.catalog.resolve_full_name(item.name());
7317 let item = item.at_version(RelationVersionSelector::Latest);
7318 let desc = item.desc(&item_name)?.into_owned();
7319 (item.id(), item_name, desc)
7320 }
7321 None => {
7322 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7323 name: name.to_ast_string_simple(),
7324 object_type,
7325 });
7326 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7327 }
7328 };
7329
7330 let column_name = ColumnName::from(column_name.as_str());
7331 if desc.get_by_name(&column_name).is_some() {
7332 if if_col_not_exist {
7333 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7334 column_name: column_name.to_string(),
7335 object_name: item_name.item,
7336 });
7337 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7338 } else {
7339 return Err(PlanError::ColumnAlreadyExists {
7340 column_name,
7341 object_name: item_name.item,
7342 });
7343 }
7344 }
7345
7346 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7347 let column_type = scalar_type.nullable(true);
7349 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7351
7352 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7353 relation_id,
7354 column_name,
7355 column_type,
7356 raw_sql_type,
7357 }))
7358}
7359
7360pub fn describe_alter_materialized_view_apply_replacement(
7361 _: &StatementContext,
7362 _: AlterMaterializedViewApplyReplacementStatement,
7363) -> Result<StatementDesc, PlanError> {
7364 Ok(StatementDesc::new(None))
7365}
7366
7367pub fn plan_alter_materialized_view_apply_replacement(
7368 scx: &StatementContext,
7369 stmt: AlterMaterializedViewApplyReplacementStatement,
7370) -> Result<Plan, PlanError> {
7371 let AlterMaterializedViewApplyReplacementStatement {
7372 if_exists,
7373 name,
7374 replacement_name,
7375 } = stmt;
7376
7377 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7378
7379 let object_type = ObjectType::MaterializedView;
7380 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7381 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7382 name: name.to_ast_string_simple(),
7383 object_type,
7384 });
7385 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7386 };
7387
7388 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7389 .expect("if_exists not set");
7390
7391 if replacement.replacement_target() != Some(mv.id()) {
7392 return Err(PlanError::InvalidReplacement {
7393 item_type: mv.item_type(),
7394 item_name: scx.catalog.minimal_qualification(mv.name()),
7395 replacement_type: replacement.item_type(),
7396 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7397 });
7398 }
7399
7400 Ok(Plan::AlterMaterializedViewApplyReplacement(
7401 AlterMaterializedViewApplyReplacementPlan {
7402 id: mv.id(),
7403 replacement_id: replacement.id(),
7404 },
7405 ))
7406}
7407
7408pub fn describe_comment(
7409 _: &StatementContext,
7410 _: CommentStatement<Aug>,
7411) -> Result<StatementDesc, PlanError> {
7412 Ok(StatementDesc::new(None))
7413}
7414
7415pub fn plan_comment(
7416 scx: &mut StatementContext,
7417 stmt: CommentStatement<Aug>,
7418) -> Result<Plan, PlanError> {
7419 const MAX_COMMENT_LENGTH: usize = 1024;
7420
7421 let CommentStatement { object, comment } = stmt;
7422
7423 if let Some(c) = &comment {
7425 if c.len() > 1024 {
7426 return Err(PlanError::CommentTooLong {
7427 length: c.len(),
7428 max_size: MAX_COMMENT_LENGTH,
7429 });
7430 }
7431 }
7432
7433 let (object_id, column_pos) = match &object {
7434 com_ty @ CommentObjectType::Table { name }
7435 | com_ty @ CommentObjectType::View { name }
7436 | com_ty @ CommentObjectType::MaterializedView { name }
7437 | com_ty @ CommentObjectType::Index { name }
7438 | com_ty @ CommentObjectType::Func { name }
7439 | com_ty @ CommentObjectType::Connection { name }
7440 | com_ty @ CommentObjectType::Source { name }
7441 | com_ty @ CommentObjectType::Sink { name }
7442 | com_ty @ CommentObjectType::Secret { name }
7443 | com_ty @ CommentObjectType::ContinualTask { name } => {
7444 let item = scx.get_item_by_resolved_name(name)?;
7445 match (com_ty, item.item_type()) {
7446 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7447 (CommentObjectId::Table(item.id()), None)
7448 }
7449 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7450 (CommentObjectId::View(item.id()), None)
7451 }
7452 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7453 (CommentObjectId::MaterializedView(item.id()), None)
7454 }
7455 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7456 (CommentObjectId::Index(item.id()), None)
7457 }
7458 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7459 (CommentObjectId::Func(item.id()), None)
7460 }
7461 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7462 (CommentObjectId::Connection(item.id()), None)
7463 }
7464 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7465 (CommentObjectId::Source(item.id()), None)
7466 }
7467 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7468 (CommentObjectId::Sink(item.id()), None)
7469 }
7470 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7471 (CommentObjectId::Secret(item.id()), None)
7472 }
7473 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7474 (CommentObjectId::ContinualTask(item.id()), None)
7475 }
7476 (com_ty, cat_ty) => {
7477 let expected_type = match com_ty {
7478 CommentObjectType::Table { .. } => ObjectType::Table,
7479 CommentObjectType::View { .. } => ObjectType::View,
7480 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7481 CommentObjectType::Index { .. } => ObjectType::Index,
7482 CommentObjectType::Func { .. } => ObjectType::Func,
7483 CommentObjectType::Connection { .. } => ObjectType::Connection,
7484 CommentObjectType::Source { .. } => ObjectType::Source,
7485 CommentObjectType::Sink { .. } => ObjectType::Sink,
7486 CommentObjectType::Secret { .. } => ObjectType::Secret,
7487 _ => unreachable!("these are the only types we match on"),
7488 };
7489
7490 return Err(PlanError::InvalidObjectType {
7491 expected_type: SystemObjectType::Object(expected_type),
7492 actual_type: SystemObjectType::Object(cat_ty.into()),
7493 object_name: item.name().item.clone(),
7494 });
7495 }
7496 }
7497 }
7498 CommentObjectType::Type { ty } => match ty {
7499 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7500 sql_bail!("cannot comment on anonymous list or map type");
7501 }
7502 ResolvedDataType::Named { id, modifiers, .. } => {
7503 if !modifiers.is_empty() {
7504 sql_bail!("cannot comment on type with modifiers");
7505 }
7506 (CommentObjectId::Type(*id), None)
7507 }
7508 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7509 },
7510 CommentObjectType::Column { name } => {
7511 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7512 match item.item_type() {
7513 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7514 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7515 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7516 CatalogItemType::MaterializedView => {
7517 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7518 }
7519 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7520 r => {
7521 return Err(PlanError::Unsupported {
7522 feature: format!("Specifying comments on a column of {r}"),
7523 discussion_no: None,
7524 });
7525 }
7526 }
7527 }
7528 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7529 CommentObjectType::Database { name } => {
7530 (CommentObjectId::Database(*name.database_id()), None)
7531 }
7532 CommentObjectType::Schema { name } => (
7533 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7534 None,
7535 ),
7536 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7537 CommentObjectType::ClusterReplica { name } => {
7538 let replica = scx.catalog.resolve_cluster_replica(name)?;
7539 (
7540 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7541 None,
7542 )
7543 }
7544 CommentObjectType::NetworkPolicy { name } => {
7545 (CommentObjectId::NetworkPolicy(name.id), None)
7546 }
7547 };
7548
7549 if let Some(p) = column_pos {
7555 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7556 max_num_columns: MAX_NUM_COLUMNS,
7557 req_num_columns: p,
7558 })?;
7559 }
7560
7561 Ok(Plan::Comment(CommentPlan {
7562 object_id,
7563 sub_component: column_pos,
7564 comment,
7565 }))
7566}
7567
7568pub(crate) fn resolve_cluster<'a>(
7569 scx: &'a StatementContext,
7570 name: &'a Ident,
7571 if_exists: bool,
7572) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7573 match scx.resolve_cluster(Some(name)) {
7574 Ok(cluster) => Ok(Some(cluster)),
7575 Err(_) if if_exists => Ok(None),
7576 Err(e) => Err(e),
7577 }
7578}
7579
7580pub(crate) fn resolve_cluster_replica<'a>(
7581 scx: &'a StatementContext,
7582 name: &QualifiedReplica,
7583 if_exists: bool,
7584) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7585 match scx.resolve_cluster(Some(&name.cluster)) {
7586 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7587 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7588 None if if_exists => Ok(None),
7589 None => Err(sql_err!(
7590 "CLUSTER {} has no CLUSTER REPLICA named {}",
7591 cluster.name(),
7592 name.replica.as_str().quoted(),
7593 )),
7594 },
7595 Err(_) if if_exists => Ok(None),
7596 Err(e) => Err(e),
7597 }
7598}
7599
7600pub(crate) fn resolve_database<'a>(
7601 scx: &'a StatementContext,
7602 name: &'a UnresolvedDatabaseName,
7603 if_exists: bool,
7604) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7605 match scx.resolve_database(name) {
7606 Ok(database) => Ok(Some(database)),
7607 Err(_) if if_exists => Ok(None),
7608 Err(e) => Err(e),
7609 }
7610}
7611
7612pub(crate) fn resolve_schema<'a>(
7613 scx: &'a StatementContext,
7614 name: UnresolvedSchemaName,
7615 if_exists: bool,
7616) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7617 match scx.resolve_schema(name) {
7618 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7619 Err(_) if if_exists => Ok(None),
7620 Err(e) => Err(e),
7621 }
7622}
7623
7624pub(crate) fn resolve_network_policy<'a>(
7625 scx: &'a StatementContext,
7626 name: Ident,
7627 if_exists: bool,
7628) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7629 match scx.catalog.resolve_network_policy(&name.to_string()) {
7630 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7631 id: policy.id(),
7632 name: policy.name().to_string(),
7633 })),
7634 Err(_) if if_exists => Ok(None),
7635 Err(e) => Err(e.into()),
7636 }
7637}
7638
7639pub(crate) fn resolve_item_or_type<'a>(
7640 scx: &'a StatementContext,
7641 object_type: ObjectType,
7642 name: UnresolvedItemName,
7643 if_exists: bool,
7644) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7645 let name = normalize::unresolved_item_name(name)?;
7646 let catalog_item = match object_type {
7647 ObjectType::Type => scx.catalog.resolve_type(&name),
7648 _ => scx.catalog.resolve_item(&name),
7649 };
7650
7651 match catalog_item {
7652 Ok(item) => {
7653 let is_type = ObjectType::from(item.item_type());
7654 if object_type == is_type {
7655 Ok(Some(item))
7656 } else {
7657 Err(PlanError::MismatchedObjectType {
7658 name: scx.catalog.minimal_qualification(item.name()),
7659 is_type,
7660 expected_type: object_type,
7661 })
7662 }
7663 }
7664 Err(_) if if_exists => Ok(None),
7665 Err(e) => Err(e.into()),
7666 }
7667}
7668
7669fn ensure_cluster_is_not_managed(
7671 scx: &StatementContext,
7672 cluster_id: ClusterId,
7673) -> Result<(), PlanError> {
7674 let cluster = scx.catalog.get_cluster(cluster_id);
7675 if cluster.is_managed() {
7676 Err(PlanError::ManagedCluster {
7677 cluster_name: cluster.name().to_string(),
7678 })
7679 } else {
7680 Ok(())
7681 }
7682}