1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
155 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
156 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
157 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
158 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
159 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
160 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
161 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
162 MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
163 PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
164 Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
165 WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
166 transform_ast,
167};
168use crate::session::vars::{
169 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
170 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
171 ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
172};
173use crate::{names, parse};
174
175mod connection;
176
177const MAX_NUM_COLUMNS: usize = 256;
182
183static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
184 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
185
186fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
190 if partition_by.len() > desc.len() {
191 tracing::error!(
192 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
193 desc.len(),
194 partition_by.len()
195 );
196 partition_by.truncate(desc.len());
197 }
198
199 let desc_prefix = desc.iter().take(partition_by.len());
200 for (idx, ((desc_name, desc_type), partition_name)) in
201 desc_prefix.zip_eq(partition_by).enumerate()
202 {
203 let partition_name = normalize::column_name(partition_name);
204 if *desc_name != partition_name {
205 sql_bail!(
206 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
207 );
208 }
209 if !preserves_order(&desc_type.scalar_type) {
210 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
211 }
212 }
213 Ok(())
214}
215
216pub fn describe_create_database(
217 _: &StatementContext,
218 _: CreateDatabaseStatement,
219) -> Result<StatementDesc, PlanError> {
220 Ok(StatementDesc::new(None))
221}
222
223pub fn plan_create_database(
224 _: &StatementContext,
225 CreateDatabaseStatement {
226 name,
227 if_not_exists,
228 }: CreateDatabaseStatement,
229) -> Result<Plan, PlanError> {
230 Ok(Plan::CreateDatabase(CreateDatabasePlan {
231 name: normalize::ident(name.0),
232 if_not_exists,
233 }))
234}
235
236pub fn describe_create_schema(
237 _: &StatementContext,
238 _: CreateSchemaStatement,
239) -> Result<StatementDesc, PlanError> {
240 Ok(StatementDesc::new(None))
241}
242
243pub fn plan_create_schema(
244 scx: &StatementContext,
245 CreateSchemaStatement {
246 mut name,
247 if_not_exists,
248 }: CreateSchemaStatement,
249) -> Result<Plan, PlanError> {
250 if name.0.len() > 2 {
251 sql_bail!("schema name {} has more than two components", name);
252 }
253 let schema_name = normalize::ident(
254 name.0
255 .pop()
256 .expect("names always have at least one component"),
257 );
258 let database_spec = match name.0.pop() {
259 None => match scx.catalog.active_database() {
260 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
261 None => sql_bail!("no database specified and no active database"),
262 },
263 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
264 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
265 Err(_) => sql_bail!("invalid database {}", n.as_str()),
266 },
267 };
268 Ok(Plan::CreateSchema(CreateSchemaPlan {
269 database_spec,
270 schema_name,
271 if_not_exists,
272 }))
273}
274
275pub fn describe_create_table(
276 _: &StatementContext,
277 _: CreateTableStatement<Aug>,
278) -> Result<StatementDesc, PlanError> {
279 Ok(StatementDesc::new(None))
280}
281
282pub fn plan_create_table(
283 scx: &StatementContext,
284 stmt: CreateTableStatement<Aug>,
285) -> Result<Plan, PlanError> {
286 let CreateTableStatement {
287 name,
288 columns,
289 constraints,
290 if_not_exists,
291 temporary,
292 with_options,
293 } = &stmt;
294
295 let names: Vec<_> = columns
296 .iter()
297 .filter(|c| {
298 let is_versioned = c
302 .options
303 .iter()
304 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
305 !is_versioned
306 })
307 .map(|c| normalize::column_name(c.name.clone()))
308 .collect();
309
310 if let Some(dup) = names.iter().duplicates().next() {
311 sql_bail!("column {} specified more than once", dup.quoted());
312 }
313
314 let mut column_types = Vec::with_capacity(columns.len());
317 let mut defaults = Vec::with_capacity(columns.len());
318 let mut changes = BTreeMap::new();
319 let mut keys = Vec::new();
320
321 for (i, c) in columns.into_iter().enumerate() {
322 let aug_data_type = &c.data_type;
323 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
324 let mut nullable = true;
325 let mut default = Expr::null();
326 let mut versioned = false;
327 for option in &c.options {
328 match &option.option {
329 ColumnOption::NotNull => nullable = false,
330 ColumnOption::Default(expr) => {
331 let mut expr = expr.clone();
334 transform_ast::transform(scx, &mut expr)?;
335 let _ = query::plan_default_expr(scx, &expr, &ty)?;
336 default = expr.clone();
337 }
338 ColumnOption::Unique { is_primary } => {
339 keys.push(vec![i]);
340 if *is_primary {
341 nullable = false;
342 }
343 }
344 ColumnOption::Versioned { action, version } => {
345 let version = RelationVersion::from(*version);
346 versioned = true;
347
348 let name = normalize::column_name(c.name.clone());
349 let typ = ty.clone().nullable(nullable);
350
351 changes.insert(version, (action.clone(), name, typ));
352 }
353 other => {
354 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
355 }
356 }
357 }
358 if !versioned {
361 column_types.push(ty.nullable(nullable));
362 }
363 defaults.push(default);
364 }
365
366 let mut seen_primary = false;
367 'c: for constraint in constraints {
368 match constraint {
369 TableConstraint::Unique {
370 name: _,
371 columns,
372 is_primary,
373 nulls_not_distinct,
374 } => {
375 if seen_primary && *is_primary {
376 sql_bail!(
377 "multiple primary keys for table {} are not allowed",
378 name.to_ast_string_stable()
379 );
380 }
381 seen_primary = *is_primary || seen_primary;
382
383 let mut key = vec![];
384 for column in columns {
385 let column = normalize::column_name(column.clone());
386 match names.iter().position(|name| *name == column) {
387 None => sql_bail!("unknown column in constraint: {}", column),
388 Some(i) => {
389 let nullable = &mut column_types[i].nullable;
390 if *is_primary {
391 if *nulls_not_distinct {
392 sql_bail!(
393 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
394 );
395 }
396
397 *nullable = false;
398 } else if !(*nulls_not_distinct || !*nullable) {
399 break 'c;
402 }
403
404 key.push(i);
405 }
406 }
407 }
408
409 if *is_primary {
410 keys.insert(0, key);
411 } else {
412 keys.push(key);
413 }
414 }
415 TableConstraint::ForeignKey { .. } => {
416 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
419 }
420 TableConstraint::Check { .. } => {
421 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
424 }
425 }
426 }
427
428 if !keys.is_empty() {
429 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
432 }
433
434 let typ = SqlRelationType::new(column_types).with_keys(keys);
435
436 let temporary = *temporary;
437 let name = if temporary {
438 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 } else {
440 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
441 };
442
443 let full_name = scx.catalog.resolve_full_name(&name);
445 let partial_name = PartialItemName::from(full_name.clone());
446 if let (false, Ok(item)) = (
449 if_not_exists,
450 scx.catalog.resolve_item_or_type(&partial_name),
451 ) {
452 return Err(PlanError::ItemAlreadyExists {
453 name: full_name.to_string(),
454 item_type: item.item_type(),
455 });
456 }
457
458 let desc = RelationDesc::new(typ, names);
459 let mut desc = VersionedRelationDesc::new(desc);
460 for (version, (_action, name, typ)) in changes.into_iter() {
461 let new_version = desc.add_column(name, typ);
462 if version != new_version {
463 return Err(PlanError::InvalidTable {
464 name: full_name.item,
465 });
466 }
467 }
468
469 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
470
471 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
477 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
478
479 let compaction_window = options.iter().find_map(|o| {
480 #[allow(irrefutable_let_patterns)]
481 if let crate::plan::TableOption::RetainHistory(lcw) = o {
482 Some(lcw.clone())
483 } else {
484 None
485 }
486 });
487
488 let table = Table {
489 create_sql,
490 desc,
491 temporary,
492 compaction_window,
493 data_source: TableDataSource::TableWrites { defaults },
494 };
495 Ok(Plan::CreateTable(CreateTablePlan {
496 name,
497 table,
498 if_not_exists: *if_not_exists,
499 }))
500}
501
502pub fn describe_create_table_from_source(
503 _: &StatementContext,
504 _: CreateTableFromSourceStatement<Aug>,
505) -> Result<StatementDesc, PlanError> {
506 Ok(StatementDesc::new(None))
507}
508
509pub fn describe_create_webhook_source(
510 _: &StatementContext,
511 _: CreateWebhookSourceStatement<Aug>,
512) -> Result<StatementDesc, PlanError> {
513 Ok(StatementDesc::new(None))
514}
515
516pub fn describe_create_source(
517 _: &StatementContext,
518 _: CreateSourceStatement<Aug>,
519) -> Result<StatementDesc, PlanError> {
520 Ok(StatementDesc::new(None))
521}
522
523pub fn describe_create_subsource(
524 _: &StatementContext,
525 _: CreateSubsourceStatement<Aug>,
526) -> Result<StatementDesc, PlanError> {
527 Ok(StatementDesc::new(None))
528}
529
530generate_extracted_config!(
531 CreateSourceOption,
532 (TimestampInterval, Duration),
533 (RetainHistory, OptionalDuration)
534);
535
536generate_extracted_config!(
537 PgConfigOption,
538 (Details, String),
539 (Publication, String),
540 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
541 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
542);
543
544generate_extracted_config!(
545 MySqlConfigOption,
546 (Details, String),
547 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
548 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
549);
550
551generate_extracted_config!(
552 SqlServerConfigOption,
553 (Details, String),
554 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
555 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
556);
557
558pub fn plan_create_webhook_source(
559 scx: &StatementContext,
560 mut stmt: CreateWebhookSourceStatement<Aug>,
561) -> Result<Plan, PlanError> {
562 if stmt.is_table {
563 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
564 }
565
566 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
569 let enable_multi_replica_sources =
570 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
571 if !enable_multi_replica_sources {
572 if in_cluster.replica_ids().len() > 1 {
573 sql_bail!("cannot create webhook source in cluster with more than one replica")
574 }
575 }
576 let create_sql =
577 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
578
579 let CreateWebhookSourceStatement {
580 name,
581 if_not_exists,
582 body_format,
583 include_headers,
584 validate_using,
585 is_table,
586 in_cluster: _,
588 } = stmt;
589
590 let validate_using = validate_using
591 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
592 .transpose()?;
593 if let Some(WebhookValidation { expression, .. }) = &validate_using {
594 if !expression.contains_column() {
597 return Err(PlanError::WebhookValidationDoesNotUseColumns);
598 }
599 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
603 return Err(PlanError::WebhookValidationNonDeterministic);
604 }
605 }
606
607 let body_format = match body_format {
608 Format::Bytes => WebhookBodyFormat::Bytes,
609 Format::Json { array } => WebhookBodyFormat::Json { array },
610 Format::Text => WebhookBodyFormat::Text,
611 ty => {
613 return Err(PlanError::Unsupported {
614 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
615 discussion_no: None,
616 });
617 }
618 };
619
620 let mut column_ty = vec![
621 SqlColumnType {
623 scalar_type: SqlScalarType::from(body_format),
624 nullable: false,
625 },
626 ];
627 let mut column_names = vec!["body".to_string()];
628
629 let mut headers = WebhookHeaders::default();
630
631 if let Some(filters) = include_headers.column {
633 column_ty.push(SqlColumnType {
634 scalar_type: SqlScalarType::Map {
635 value_type: Box::new(SqlScalarType::String),
636 custom_id: None,
637 },
638 nullable: false,
639 });
640 column_names.push("headers".to_string());
641
642 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
643 filters.into_iter().partition_map(|filter| {
644 if filter.block {
645 itertools::Either::Right(filter.header_name)
646 } else {
647 itertools::Either::Left(filter.header_name)
648 }
649 });
650 headers.header_column = Some(WebhookHeaderFilters { allow, block });
651 }
652
653 for header in include_headers.mappings {
655 let scalar_type = header
656 .use_bytes
657 .then_some(SqlScalarType::Bytes)
658 .unwrap_or(SqlScalarType::String);
659 column_ty.push(SqlColumnType {
660 scalar_type,
661 nullable: true,
662 });
663 column_names.push(header.column_name.into_string());
664
665 let column_idx = column_ty.len() - 1;
666 assert_eq!(
668 column_idx,
669 column_names.len() - 1,
670 "header column names and types don't match"
671 );
672 headers
673 .mapped_headers
674 .insert(column_idx, (header.header_name, header.use_bytes));
675 }
676
677 let mut unique_check = HashSet::with_capacity(column_names.len());
679 for name in &column_names {
680 if !unique_check.insert(name) {
681 return Err(PlanError::AmbiguousColumn(name.clone().into()));
682 }
683 }
684 if column_names.len() > MAX_NUM_COLUMNS {
685 return Err(PlanError::TooManyColumns {
686 max_num_columns: MAX_NUM_COLUMNS,
687 req_num_columns: column_names.len(),
688 });
689 }
690
691 let typ = SqlRelationType::new(column_ty);
692 let desc = RelationDesc::new(typ, column_names);
693
694 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
696 let full_name = scx.catalog.resolve_full_name(&name);
697 let partial_name = PartialItemName::from(full_name.clone());
698 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
699 return Err(PlanError::ItemAlreadyExists {
700 name: full_name.to_string(),
701 item_type: item.item_type(),
702 });
703 }
704
705 let timeline = Timeline::EpochMilliseconds;
708
709 let plan = if is_table {
710 let data_source = DataSourceDesc::Webhook {
711 validate_using,
712 body_format,
713 headers,
714 cluster_id: Some(in_cluster.id()),
715 };
716 let data_source = TableDataSource::DataSource {
717 desc: data_source,
718 timeline,
719 };
720 Plan::CreateTable(CreateTablePlan {
721 name,
722 if_not_exists,
723 table: Table {
724 create_sql,
725 desc: VersionedRelationDesc::new(desc),
726 temporary: false,
727 compaction_window: None,
728 data_source,
729 },
730 })
731 } else {
732 let data_source = DataSourceDesc::Webhook {
733 validate_using,
734 body_format,
735 headers,
736 cluster_id: None,
738 };
739 Plan::CreateSource(CreateSourcePlan {
740 name,
741 source: Source {
742 create_sql,
743 data_source,
744 desc,
745 compaction_window: None,
746 },
747 if_not_exists,
748 timeline,
749 in_cluster: Some(in_cluster.id()),
750 })
751 };
752
753 Ok(plan)
754}
755
756pub fn plan_create_source(
757 scx: &StatementContext,
758 mut stmt: CreateSourceStatement<Aug>,
759) -> Result<Plan, PlanError> {
760 let CreateSourceStatement {
761 name,
762 in_cluster: _,
763 col_names,
764 connection: source_connection,
765 envelope,
766 if_not_exists,
767 format,
768 key_constraint,
769 include_metadata,
770 with_options,
771 external_references: referenced_subsources,
772 progress_subsource,
773 } = &stmt;
774
775 mz_ore::soft_assert_or_log!(
776 referenced_subsources.is_none(),
777 "referenced subsources must be cleared in purification"
778 );
779
780 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
781 && scx.catalog.system_vars().force_source_table_syntax();
782
783 if force_source_table_syntax {
786 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
787 Err(PlanError::UseTablesForSources(
788 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
789 ))?;
790 }
791 }
792
793 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
794
795 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
796 && include_metadata
797 .iter()
798 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
799 {
800 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
802 }
803 if !matches!(
804 source_connection,
805 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
806 ) && !include_metadata.is_empty()
807 {
808 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
809 }
810
811 if !include_metadata.is_empty()
812 && !matches!(
813 envelope,
814 ast::SourceEnvelope::Upsert { .. }
815 | ast::SourceEnvelope::None
816 | ast::SourceEnvelope::Debezium
817 )
818 {
819 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
820 }
821
822 let external_connection =
823 plan_generic_source_connection(scx, source_connection, include_metadata)?;
824
825 let CreateSourceOptionExtracted {
826 timestamp_interval,
827 retain_history,
828 seen: _,
829 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
830
831 let metadata_columns_desc = match external_connection {
832 GenericSourceConnection::Kafka(KafkaSourceConnection {
833 ref metadata_columns,
834 ..
835 }) => kafka_metadata_columns_desc(metadata_columns),
836 _ => vec![],
837 };
838
839 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
841 scx,
842 &envelope,
843 format,
844 Some(external_connection.default_key_desc()),
845 external_connection.default_value_desc(),
846 include_metadata,
847 metadata_columns_desc,
848 &external_connection,
849 )?;
850 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
851
852 let names: Vec<_> = desc.iter_names().cloned().collect();
853 if let Some(dup) = names.iter().duplicates().next() {
854 sql_bail!("column {} specified more than once", dup.quoted());
855 }
856
857 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
859 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
862
863 let key_columns = columns
864 .into_iter()
865 .map(normalize::column_name)
866 .collect::<Vec<_>>();
867
868 let mut uniq = BTreeSet::new();
869 for col in key_columns.iter() {
870 if !uniq.insert(col) {
871 sql_bail!("Repeated column name in source key constraint: {}", col);
872 }
873 }
874
875 let key_indices = key_columns
876 .iter()
877 .map(|col| {
878 let name_idx = desc
879 .get_by_name(col)
880 .map(|(idx, _type)| idx)
881 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
882 if desc.get_unambiguous_name(name_idx).is_none() {
883 sql_bail!("Ambiguous column in source key constraint: {}", col);
884 }
885 Ok(name_idx)
886 })
887 .collect::<Result<Vec<_>, _>>()?;
888
889 if !desc.typ().keys.is_empty() {
890 return Err(key_constraint_err(&desc, &key_columns));
891 } else {
892 desc = desc.with_key(key_indices);
893 }
894 }
895
896 let timestamp_interval = match timestamp_interval {
897 Some(duration) => {
898 if scx.pcx.is_some() {
902 let min = scx.catalog.system_vars().min_timestamp_interval();
903 let max = scx.catalog.system_vars().max_timestamp_interval();
904 if duration < min || duration > max {
905 return Err(PlanError::InvalidTimestampInterval {
906 min,
907 max,
908 requested: duration,
909 });
910 }
911 }
912 duration
913 }
914 None => scx.catalog.system_vars().default_timestamp_interval(),
915 };
916
917 let (desc, data_source) = match progress_subsource {
918 Some(name) => {
919 let DeferredItemName::Named(name) = name else {
920 sql_bail!("[internal error] progress subsource must be named during purification");
921 };
922 let ResolvedItemName::Item { id, .. } = name else {
923 sql_bail!("[internal error] invalid target id");
924 };
925
926 let details = match external_connection {
927 GenericSourceConnection::Kafka(ref c) => {
928 SourceExportDetails::Kafka(KafkaSourceExportDetails {
929 metadata_columns: c.metadata_columns.clone(),
930 })
931 }
932 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
933 LoadGenerator::Auction
934 | LoadGenerator::Marketing
935 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
936 LoadGenerator::Counter { .. }
937 | LoadGenerator::Clock
938 | LoadGenerator::Datums
939 | LoadGenerator::KeyValue(_) => {
940 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
941 output: LoadGeneratorOutput::Default,
942 })
943 }
944 },
945 GenericSourceConnection::Postgres(_)
946 | GenericSourceConnection::MySql(_)
947 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
948 };
949
950 let data_source = DataSourceDesc::OldSyntaxIngestion {
951 desc: SourceDesc {
952 connection: external_connection,
953 timestamp_interval,
954 },
955 progress_subsource: *id,
956 data_config: SourceExportDataConfig {
957 encoding,
958 envelope: envelope.clone(),
959 },
960 details,
961 };
962 (desc, data_source)
963 }
964 None => {
965 let desc = external_connection.timestamp_desc();
966 let data_source = DataSourceDesc::Ingestion(SourceDesc {
967 connection: external_connection,
968 timestamp_interval,
969 });
970 (desc, data_source)
971 }
972 };
973
974 let if_not_exists = *if_not_exists;
975 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
976
977 let full_name = scx.catalog.resolve_full_name(&name);
979 let partial_name = PartialItemName::from(full_name.clone());
980 if let (false, Ok(item)) = (
983 if_not_exists,
984 scx.catalog.resolve_item_or_type(&partial_name),
985 ) {
986 return Err(PlanError::ItemAlreadyExists {
987 name: full_name.to_string(),
988 item_type: item.item_type(),
989 });
990 }
991
992 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
996
997 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
998
999 let timeline = match envelope {
1001 SourceEnvelope::CdcV2 => {
1002 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1003 }
1004 _ => Timeline::EpochMilliseconds,
1005 };
1006
1007 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1008
1009 let source = Source {
1010 create_sql,
1011 data_source,
1012 desc,
1013 compaction_window,
1014 };
1015
1016 Ok(Plan::CreateSource(CreateSourcePlan {
1017 name,
1018 source,
1019 if_not_exists,
1020 timeline,
1021 in_cluster: Some(in_cluster.id()),
1022 }))
1023}
1024
1025pub fn plan_generic_source_connection(
1026 scx: &StatementContext<'_>,
1027 source_connection: &CreateSourceConnection<Aug>,
1028 include_metadata: &Vec<SourceIncludeMetadata>,
1029) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1030 Ok(match source_connection {
1031 CreateSourceConnection::Kafka {
1032 connection,
1033 options,
1034 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1035 scx,
1036 connection,
1037 options,
1038 include_metadata,
1039 )?),
1040 CreateSourceConnection::Postgres {
1041 connection,
1042 options,
1043 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1044 scx, connection, options,
1045 )?),
1046 CreateSourceConnection::SqlServer {
1047 connection,
1048 options,
1049 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1050 scx, connection, options,
1051 )?),
1052 CreateSourceConnection::MySql {
1053 connection,
1054 options,
1055 } => {
1056 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1057 }
1058 CreateSourceConnection::LoadGenerator { generator, options } => {
1059 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1060 scx,
1061 generator,
1062 options,
1063 include_metadata,
1064 )?)
1065 }
1066 })
1067}
1068
1069fn plan_load_generator_source_connection(
1070 scx: &StatementContext<'_>,
1071 generator: &ast::LoadGenerator,
1072 options: &Vec<LoadGeneratorOption<Aug>>,
1073 include_metadata: &Vec<SourceIncludeMetadata>,
1074) -> Result<LoadGeneratorSourceConnection, PlanError> {
1075 let load_generator =
1076 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1077 let LoadGeneratorOptionExtracted {
1078 tick_interval,
1079 as_of,
1080 up_to,
1081 ..
1082 } = options.clone().try_into()?;
1083 let tick_micros = match tick_interval {
1084 Some(interval) => Some(interval.as_micros().try_into()?),
1085 None => None,
1086 };
1087 if up_to < as_of {
1088 sql_bail!("UP TO cannot be less than AS OF");
1089 }
1090 Ok(LoadGeneratorSourceConnection {
1091 load_generator,
1092 tick_micros,
1093 as_of,
1094 up_to,
1095 })
1096}
1097
1098fn plan_mysql_source_connection(
1099 scx: &StatementContext<'_>,
1100 connection: &ResolvedItemName,
1101 options: &Vec<MySqlConfigOption<Aug>>,
1102) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1103 let connection_item = scx.get_item_by_resolved_name(connection)?;
1104 match connection_item.connection()? {
1105 Connection::MySql(connection) => connection,
1106 _ => sql_bail!(
1107 "{} is not a MySQL connection",
1108 scx.catalog.resolve_full_name(connection_item.name())
1109 ),
1110 };
1111 let MySqlConfigOptionExtracted {
1112 details,
1113 text_columns: _,
1117 exclude_columns: _,
1118 seen: _,
1119 } = options.clone().try_into()?;
1120 let details = details
1121 .as_ref()
1122 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1123 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1124 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1125 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1126 Ok(MySqlSourceConnection {
1127 connection: connection_item.id(),
1128 connection_id: connection_item.id(),
1129 details,
1130 })
1131}
1132
1133fn plan_sqlserver_source_connection(
1134 scx: &StatementContext<'_>,
1135 connection: &ResolvedItemName,
1136 options: &Vec<SqlServerConfigOption<Aug>>,
1137) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1138 let connection_item = scx.get_item_by_resolved_name(connection)?;
1139 match connection_item.connection()? {
1140 Connection::SqlServer(connection) => connection,
1141 _ => sql_bail!(
1142 "{} is not a SQL Server connection",
1143 scx.catalog.resolve_full_name(connection_item.name())
1144 ),
1145 };
1146 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1147 let details = details
1148 .as_ref()
1149 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1150 let extras = hex::decode(details)
1151 .map_err(|e| sql_err!("{e}"))
1152 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1153 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1154 Ok(SqlServerSourceConnection {
1155 connection_id: connection_item.id(),
1156 connection: connection_item.id(),
1157 extras,
1158 })
1159}
1160
1161fn plan_postgres_source_connection(
1162 scx: &StatementContext<'_>,
1163 connection: &ResolvedItemName,
1164 options: &Vec<PgConfigOption<Aug>>,
1165) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1166 let connection_item = scx.get_item_by_resolved_name(connection)?;
1167 let PgConfigOptionExtracted {
1168 details,
1169 publication,
1170 text_columns: _,
1174 exclude_columns: _,
1178 seen: _,
1179 } = options.clone().try_into()?;
1180 let details = details
1181 .as_ref()
1182 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1183 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1184 let details =
1185 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1186 let publication_details =
1187 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1188 Ok(PostgresSourceConnection {
1189 connection: connection_item.id(),
1190 connection_id: connection_item.id(),
1191 publication: publication.expect("validated exists during purification"),
1192 publication_details,
1193 })
1194}
1195
1196fn plan_kafka_source_connection(
1197 scx: &StatementContext<'_>,
1198 connection_name: &ResolvedItemName,
1199 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1200 include_metadata: &Vec<SourceIncludeMetadata>,
1201) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1202 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1203 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1204 sql_bail!(
1205 "{} is not a kafka connection",
1206 scx.catalog.resolve_full_name(connection_item.name())
1207 )
1208 }
1209 let KafkaSourceConfigOptionExtracted {
1210 group_id_prefix,
1211 topic,
1212 topic_metadata_refresh_interval,
1213 start_timestamp: _, start_offset,
1215 seen: _,
1216 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1217 let topic = topic.expect("validated exists during purification");
1218 let mut start_offsets = BTreeMap::new();
1219 if let Some(offsets) = start_offset {
1220 for (part, offset) in offsets.iter().enumerate() {
1221 if *offset < 0 {
1222 sql_bail!("START OFFSET must be a nonnegative integer");
1223 }
1224 start_offsets.insert(i32::try_from(part)?, *offset);
1225 }
1226 }
1227 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1228 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1231 }
1232 let metadata_columns = include_metadata
1233 .into_iter()
1234 .flat_map(|item| match item {
1235 SourceIncludeMetadata::Timestamp { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "timestamp".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Timestamp))
1241 }
1242 SourceIncludeMetadata::Partition { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "partition".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Partition))
1248 }
1249 SourceIncludeMetadata::Offset { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "offset".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Offset))
1255 }
1256 SourceIncludeMetadata::Headers { alias } => {
1257 let name = match alias {
1258 Some(name) => name.to_string(),
1259 None => "headers".to_owned(),
1260 };
1261 Some((name, KafkaMetadataKind::Headers))
1262 }
1263 SourceIncludeMetadata::Header {
1264 alias,
1265 key,
1266 use_bytes,
1267 } => Some((
1268 alias.to_string(),
1269 KafkaMetadataKind::Header {
1270 key: key.clone(),
1271 use_bytes: *use_bytes,
1272 },
1273 )),
1274 SourceIncludeMetadata::Key { .. } => {
1275 None
1277 }
1278 })
1279 .collect();
1280 Ok(KafkaSourceConnection {
1281 connection: connection_item.id(),
1282 connection_id: connection_item.id(),
1283 topic,
1284 start_offsets,
1285 group_id_prefix,
1286 topic_metadata_refresh_interval,
1287 metadata_columns,
1288 })
1289}
1290
1291fn apply_source_envelope_encoding(
1292 scx: &StatementContext,
1293 envelope: &ast::SourceEnvelope,
1294 format: &Option<FormatSpecifier<Aug>>,
1295 key_desc: Option<RelationDesc>,
1296 value_desc: RelationDesc,
1297 include_metadata: &[SourceIncludeMetadata],
1298 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1299 source_connection: &GenericSourceConnection<ReferencedConnection>,
1300) -> Result<
1301 (
1302 RelationDesc,
1303 SourceEnvelope,
1304 Option<SourceDataEncoding<ReferencedConnection>>,
1305 ),
1306 PlanError,
1307> {
1308 let encoding = match format {
1309 Some(format) => Some(get_encoding(scx, format, envelope)?),
1310 None => None,
1311 };
1312
1313 let (key_desc, value_desc) = match &encoding {
1314 Some(encoding) => {
1315 match value_desc.typ().columns() {
1318 [typ] => match typ.scalar_type {
1319 SqlScalarType::Bytes => {}
1320 _ => sql_bail!(
1321 "The schema produced by the source is incompatible with format decoding"
1322 ),
1323 },
1324 _ => sql_bail!(
1325 "The schema produced by the source is incompatible with format decoding"
1326 ),
1327 }
1328
1329 let (key_desc, value_desc) = encoding.desc()?;
1330
1331 let key_desc = key_desc.map(|desc| {
1338 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1339 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1340 if is_kafka && is_envelope_none {
1341 RelationDesc::from_names_and_types(
1342 desc.into_iter()
1343 .map(|(name, typ)| (name, typ.nullable(true))),
1344 )
1345 } else {
1346 desc
1347 }
1348 });
1349 (key_desc, value_desc)
1350 }
1351 None => (key_desc, value_desc),
1352 };
1353
1354 let key_envelope_no_encoding = matches!(
1367 source_connection,
1368 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1369 load_generator: LoadGenerator::KeyValue(_),
1370 ..
1371 })
1372 );
1373 let mut key_envelope = get_key_envelope(
1374 include_metadata,
1375 encoding.as_ref(),
1376 key_envelope_no_encoding,
1377 )?;
1378
1379 match (&envelope, &key_envelope) {
1380 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1381 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1382 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1383 ),
1384 _ => {}
1385 };
1386
1387 let envelope = match &envelope {
1396 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1398 ast::SourceEnvelope::Debezium => {
1399 let after_idx = match typecheck_debezium(&value_desc) {
1401 Ok((_before_idx, after_idx)) => Ok(after_idx),
1402 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1403 Some(DataEncoding::Avro(_)) => Err(type_err),
1404 _ => Err(sql_err!(
1405 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1406 )),
1407 },
1408 }?;
1409
1410 UnplannedSourceEnvelope::Upsert {
1411 style: UpsertStyle::Debezium { after_idx },
1412 }
1413 }
1414 ast::SourceEnvelope::Upsert {
1415 value_decode_err_policy,
1416 } => {
1417 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1418 None => {
1419 if !key_envelope_no_encoding {
1420 bail_unsupported!(format!(
1421 "UPSERT requires a key/value format: {:?}",
1422 format
1423 ))
1424 }
1425 None
1426 }
1427 Some(key_encoding) => Some(key_encoding),
1428 };
1429 if key_envelope == KeyEnvelope::None {
1432 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1433 }
1434 let style = match value_decode_err_policy.as_slice() {
1436 [] => UpsertStyle::Default(key_envelope),
1437 [SourceErrorPolicy::Inline { alias }] => {
1438 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1439 UpsertStyle::ValueErrInline {
1440 key_envelope,
1441 error_column: alias
1442 .as_ref()
1443 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1444 }
1445 }
1446 _ => {
1447 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1448 }
1449 };
1450
1451 UnplannedSourceEnvelope::Upsert { style }
1452 }
1453 ast::SourceEnvelope::CdcV2 => {
1454 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1455 match format {
1457 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1458 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1459 }
1460 UnplannedSourceEnvelope::CdcV2
1461 }
1462 };
1463
1464 let metadata_desc = included_column_desc(metadata_columns_desc);
1465 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1466
1467 Ok((desc, envelope, encoding))
1468}
1469
1470fn plan_source_export_desc(
1473 scx: &StatementContext,
1474 name: &UnresolvedItemName,
1475 columns: &Vec<ColumnDef<Aug>>,
1476 constraints: &Vec<TableConstraint<Aug>>,
1477) -> Result<RelationDesc, PlanError> {
1478 let names: Vec<_> = columns
1479 .iter()
1480 .map(|c| normalize::column_name(c.name.clone()))
1481 .collect();
1482
1483 if let Some(dup) = names.iter().duplicates().next() {
1484 sql_bail!("column {} specified more than once", dup.quoted());
1485 }
1486
1487 let mut column_types = Vec::with_capacity(columns.len());
1490 let mut keys = Vec::new();
1491
1492 for (i, c) in columns.into_iter().enumerate() {
1493 let aug_data_type = &c.data_type;
1494 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1495 let mut nullable = true;
1496 for option in &c.options {
1497 match &option.option {
1498 ColumnOption::NotNull => nullable = false,
1499 ColumnOption::Default(_) => {
1500 bail_unsupported!("Source export with default value")
1501 }
1502 ColumnOption::Unique { is_primary } => {
1503 keys.push(vec![i]);
1504 if *is_primary {
1505 nullable = false;
1506 }
1507 }
1508 other => {
1509 bail_unsupported!(format!("Source export with column constraint: {}", other))
1510 }
1511 }
1512 }
1513 column_types.push(ty.nullable(nullable));
1514 }
1515
1516 let mut seen_primary = false;
1517 'c: for constraint in constraints {
1518 match constraint {
1519 TableConstraint::Unique {
1520 name: _,
1521 columns,
1522 is_primary,
1523 nulls_not_distinct,
1524 } => {
1525 if seen_primary && *is_primary {
1526 sql_bail!(
1527 "multiple primary keys for source export {} are not allowed",
1528 name.to_ast_string_stable()
1529 );
1530 }
1531 seen_primary = *is_primary || seen_primary;
1532
1533 let mut key = vec![];
1534 for column in columns {
1535 let column = normalize::column_name(column.clone());
1536 match names.iter().position(|name| *name == column) {
1537 None => sql_bail!("unknown column in constraint: {}", column),
1538 Some(i) => {
1539 let nullable = &mut column_types[i].nullable;
1540 if *is_primary {
1541 if *nulls_not_distinct {
1542 sql_bail!(
1543 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1544 );
1545 }
1546 *nullable = false;
1547 } else if !(*nulls_not_distinct || !*nullable) {
1548 break 'c;
1551 }
1552
1553 key.push(i);
1554 }
1555 }
1556 }
1557
1558 if *is_primary {
1559 keys.insert(0, key);
1560 } else {
1561 keys.push(key);
1562 }
1563 }
1564 TableConstraint::ForeignKey { .. } => {
1565 bail_unsupported!("Source export with a foreign key")
1566 }
1567 TableConstraint::Check { .. } => {
1568 bail_unsupported!("Source export with a check constraint")
1569 }
1570 }
1571 }
1572
1573 let typ = SqlRelationType::new(column_types).with_keys(keys);
1574 let desc = RelationDesc::new(typ, names);
1575 Ok(desc)
1576}
1577
1578generate_extracted_config!(
1579 CreateSubsourceOption,
1580 (Progress, bool, Default(false)),
1581 (ExternalReference, UnresolvedItemName),
1582 (RetainHistory, OptionalDuration),
1583 (TextColumns, Vec::<Ident>, Default(vec![])),
1584 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1585 (Details, String)
1586);
1587
1588pub fn plan_create_subsource(
1589 scx: &StatementContext,
1590 stmt: CreateSubsourceStatement<Aug>,
1591) -> Result<Plan, PlanError> {
1592 let CreateSubsourceStatement {
1593 name,
1594 columns,
1595 of_source,
1596 constraints,
1597 if_not_exists,
1598 with_options,
1599 } = &stmt;
1600
1601 let CreateSubsourceOptionExtracted {
1602 progress,
1603 retain_history,
1604 external_reference,
1605 text_columns,
1606 exclude_columns,
1607 details,
1608 seen: _,
1609 } = with_options.clone().try_into()?;
1610
1611 assert!(
1616 progress ^ (external_reference.is_some() && of_source.is_some()),
1617 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1618 );
1619
1620 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1621
1622 let data_source = if let Some(source_reference) = of_source {
1623 if scx.catalog.system_vars().enable_create_table_from_source()
1626 && scx.catalog.system_vars().force_source_table_syntax()
1627 {
1628 Err(PlanError::UseTablesForSources(
1629 "CREATE SUBSOURCE".to_string(),
1630 ))?;
1631 }
1632
1633 let ingestion_id = *source_reference.item_id();
1636 let external_reference = external_reference.unwrap();
1637
1638 let details = details
1641 .as_ref()
1642 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1643 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1644 let details =
1645 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1646 let details =
1647 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1648 let details = match details {
1649 SourceExportStatementDetails::Postgres { table } => {
1650 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1651 column_casts: crate::pure::postgres::generate_column_casts(
1652 scx,
1653 &table,
1654 &text_columns,
1655 )?,
1656 table,
1657 })
1658 }
1659 SourceExportStatementDetails::MySql {
1660 table,
1661 initial_gtid_set,
1662 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1663 table,
1664 initial_gtid_set,
1665 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1666 exclude_columns: exclude_columns
1667 .into_iter()
1668 .map(|c| c.into_string())
1669 .collect(),
1670 }),
1671 SourceExportStatementDetails::SqlServer {
1672 table,
1673 capture_instance,
1674 initial_lsn,
1675 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1676 capture_instance,
1677 table,
1678 initial_lsn,
1679 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1680 exclude_columns: exclude_columns
1681 .into_iter()
1682 .map(|c| c.into_string())
1683 .collect(),
1684 }),
1685 SourceExportStatementDetails::LoadGenerator { output } => {
1686 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1687 }
1688 SourceExportStatementDetails::Kafka {} => {
1689 bail_unsupported!("subsources cannot reference Kafka sources")
1690 }
1691 };
1692 DataSourceDesc::IngestionExport {
1693 ingestion_id,
1694 external_reference,
1695 details,
1696 data_config: SourceExportDataConfig {
1698 envelope: SourceEnvelope::None(NoneEnvelope {
1699 key_envelope: KeyEnvelope::None,
1700 key_arity: 0,
1701 }),
1702 encoding: None,
1703 },
1704 }
1705 } else if progress {
1706 DataSourceDesc::Progress
1707 } else {
1708 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1709 };
1710
1711 let if_not_exists = *if_not_exists;
1712 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1713
1714 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1715
1716 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1717 let source = Source {
1718 create_sql,
1719 data_source,
1720 desc,
1721 compaction_window,
1722 };
1723
1724 Ok(Plan::CreateSource(CreateSourcePlan {
1725 name,
1726 source,
1727 if_not_exists,
1728 timeline: Timeline::EpochMilliseconds,
1729 in_cluster: None,
1730 }))
1731}
1732
1733generate_extracted_config!(
1734 TableFromSourceOption,
1735 (TextColumns, Vec::<Ident>, Default(vec![])),
1736 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1737 (PartitionBy, Vec<Ident>),
1738 (RetainHistory, OptionalDuration),
1739 (Details, String)
1740);
1741
1742pub fn plan_create_table_from_source(
1743 scx: &StatementContext,
1744 stmt: CreateTableFromSourceStatement<Aug>,
1745) -> Result<Plan, PlanError> {
1746 if !scx.catalog.system_vars().enable_create_table_from_source() {
1747 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1748 }
1749
1750 let CreateTableFromSourceStatement {
1751 name,
1752 columns,
1753 constraints,
1754 if_not_exists,
1755 source,
1756 external_reference,
1757 envelope,
1758 format,
1759 include_metadata,
1760 with_options,
1761 } = &stmt;
1762
1763 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1764
1765 let TableFromSourceOptionExtracted {
1766 text_columns,
1767 exclude_columns,
1768 retain_history,
1769 partition_by,
1770 details,
1771 seen: _,
1772 } = with_options.clone().try_into()?;
1773
1774 let source_item = scx.get_item_by_resolved_name(source)?;
1775 let ingestion_id = source_item.id();
1776
1777 let details = details
1780 .as_ref()
1781 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1782 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1783 let details =
1784 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1785 let details =
1786 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1787
1788 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1789 && include_metadata
1790 .iter()
1791 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1792 {
1793 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1795 }
1796 if !matches!(
1797 details,
1798 SourceExportStatementDetails::Kafka { .. }
1799 | SourceExportStatementDetails::LoadGenerator { .. }
1800 ) && !include_metadata.is_empty()
1801 {
1802 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1803 }
1804
1805 let details = match details {
1806 SourceExportStatementDetails::Postgres { table } => {
1807 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1808 column_casts: crate::pure::postgres::generate_column_casts(
1809 scx,
1810 &table,
1811 &text_columns,
1812 )?,
1813 table,
1814 })
1815 }
1816 SourceExportStatementDetails::MySql {
1817 table,
1818 initial_gtid_set,
1819 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1820 table,
1821 initial_gtid_set,
1822 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1823 exclude_columns: exclude_columns
1824 .into_iter()
1825 .map(|c| c.into_string())
1826 .collect(),
1827 }),
1828 SourceExportStatementDetails::SqlServer {
1829 table,
1830 capture_instance,
1831 initial_lsn,
1832 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1833 table,
1834 capture_instance,
1835 initial_lsn,
1836 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1837 exclude_columns: exclude_columns
1838 .into_iter()
1839 .map(|c| c.into_string())
1840 .collect(),
1841 }),
1842 SourceExportStatementDetails::LoadGenerator { output } => {
1843 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1844 }
1845 SourceExportStatementDetails::Kafka {} => {
1846 if !include_metadata.is_empty()
1847 && !matches!(
1848 envelope,
1849 ast::SourceEnvelope::Upsert { .. }
1850 | ast::SourceEnvelope::None
1851 | ast::SourceEnvelope::Debezium
1852 )
1853 {
1854 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1856 }
1857
1858 let metadata_columns = include_metadata
1859 .into_iter()
1860 .flat_map(|item| match item {
1861 SourceIncludeMetadata::Timestamp { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "timestamp".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Timestamp))
1867 }
1868 SourceIncludeMetadata::Partition { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "partition".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Partition))
1874 }
1875 SourceIncludeMetadata::Offset { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "offset".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Offset))
1881 }
1882 SourceIncludeMetadata::Headers { alias } => {
1883 let name = match alias {
1884 Some(name) => name.to_string(),
1885 None => "headers".to_owned(),
1886 };
1887 Some((name, KafkaMetadataKind::Headers))
1888 }
1889 SourceIncludeMetadata::Header {
1890 alias,
1891 key,
1892 use_bytes,
1893 } => Some((
1894 alias.to_string(),
1895 KafkaMetadataKind::Header {
1896 key: key.clone(),
1897 use_bytes: *use_bytes,
1898 },
1899 )),
1900 SourceIncludeMetadata::Key { .. } => {
1901 None
1903 }
1904 })
1905 .collect();
1906
1907 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1908 }
1909 };
1910
1911 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1912
1913 let (key_desc, value_desc) =
1918 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1919 let columns = match columns {
1920 TableFromSourceColumns::Defined(columns) => columns,
1921 _ => unreachable!(),
1922 };
1923 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1924 (None, desc)
1925 } else {
1926 let key_desc = source_connection.default_key_desc();
1927 let value_desc = source_connection.default_value_desc();
1928 (Some(key_desc), value_desc)
1929 };
1930
1931 let metadata_columns_desc = match &details {
1932 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1933 metadata_columns, ..
1934 }) => kafka_metadata_columns_desc(metadata_columns),
1935 _ => vec![],
1936 };
1937
1938 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1939 scx,
1940 &envelope,
1941 format,
1942 key_desc,
1943 value_desc,
1944 include_metadata,
1945 metadata_columns_desc,
1946 source_connection,
1947 )?;
1948 if let TableFromSourceColumns::Named(col_names) = columns {
1949 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1950 }
1951
1952 let names: Vec<_> = desc.iter_names().cloned().collect();
1953 if let Some(dup) = names.iter().duplicates().next() {
1954 sql_bail!("column {} specified more than once", dup.quoted());
1955 }
1956
1957 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1958
1959 let timeline = match envelope {
1962 SourceEnvelope::CdcV2 => {
1963 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1964 }
1965 _ => Timeline::EpochMilliseconds,
1966 };
1967
1968 if let Some(partition_by) = partition_by {
1969 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1970 check_partition_by(&desc, partition_by)?;
1971 }
1972
1973 let data_source = DataSourceDesc::IngestionExport {
1974 ingestion_id,
1975 external_reference: external_reference
1976 .as_ref()
1977 .expect("populated in purification")
1978 .clone(),
1979 details,
1980 data_config: SourceExportDataConfig { envelope, encoding },
1981 };
1982
1983 let if_not_exists = *if_not_exists;
1984
1985 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1986
1987 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1988 let table = Table {
1989 create_sql,
1990 desc: VersionedRelationDesc::new(desc),
1991 temporary: false,
1992 compaction_window,
1993 data_source: TableDataSource::DataSource {
1994 desc: data_source,
1995 timeline,
1996 },
1997 };
1998
1999 Ok(Plan::CreateTable(CreateTablePlan {
2000 name,
2001 table,
2002 if_not_exists,
2003 }))
2004}
2005
2006generate_extracted_config!(
2007 LoadGeneratorOption,
2008 (TickInterval, Duration),
2009 (AsOf, u64, Default(0_u64)),
2010 (UpTo, u64, Default(u64::MAX)),
2011 (ScaleFactor, f64),
2012 (MaxCardinality, u64),
2013 (Keys, u64),
2014 (SnapshotRounds, u64),
2015 (TransactionalSnapshot, bool),
2016 (ValueSize, u64),
2017 (Seed, u64),
2018 (Partitions, u64),
2019 (BatchSize, u64)
2020);
2021
2022impl LoadGeneratorOptionExtracted {
2023 pub(super) fn ensure_only_valid_options(
2024 &self,
2025 loadgen: &ast::LoadGenerator,
2026 ) -> Result<(), PlanError> {
2027 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2028
2029 let mut options = self.seen.clone();
2030
2031 let permitted_options: &[_] = match loadgen {
2032 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2033 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2034 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2035 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2036 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2037 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2038 ast::LoadGenerator::KeyValue => &[
2039 TickInterval,
2040 Keys,
2041 SnapshotRounds,
2042 TransactionalSnapshot,
2043 ValueSize,
2044 Seed,
2045 Partitions,
2046 BatchSize,
2047 ],
2048 };
2049
2050 for o in permitted_options {
2051 options.remove(o);
2052 }
2053
2054 if !options.is_empty() {
2055 sql_bail!(
2056 "{} load generators do not support {} values",
2057 loadgen,
2058 options.iter().join(", ")
2059 )
2060 }
2061
2062 Ok(())
2063 }
2064}
2065
2066pub(crate) fn load_generator_ast_to_generator(
2067 scx: &StatementContext,
2068 loadgen: &ast::LoadGenerator,
2069 options: &[LoadGeneratorOption<Aug>],
2070 include_metadata: &[SourceIncludeMetadata],
2071) -> Result<LoadGenerator, PlanError> {
2072 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2073 extracted.ensure_only_valid_options(loadgen)?;
2074
2075 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2076 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2077 }
2078
2079 let load_generator = match loadgen {
2080 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2081 ast::LoadGenerator::Clock => {
2082 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2083 LoadGenerator::Clock
2084 }
2085 ast::LoadGenerator::Counter => {
2086 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2087 let LoadGeneratorOptionExtracted {
2088 max_cardinality, ..
2089 } = extracted;
2090 LoadGenerator::Counter { max_cardinality }
2091 }
2092 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2093 ast::LoadGenerator::Datums => {
2094 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2095 LoadGenerator::Datums
2096 }
2097 ast::LoadGenerator::Tpch => {
2098 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2099
2100 let sf: f64 = scale_factor.unwrap_or(0.01);
2102 if !sf.is_finite() || sf < 0.0 {
2103 sql_bail!("unsupported scale factor {sf}");
2104 }
2105
2106 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2107 let total = (sf * multiplier).floor();
2108 let mut i = i64::try_cast_from(total)
2109 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2110 if i < 1 {
2111 i = 1;
2112 }
2113 Ok(i)
2114 };
2115
2116 let count_supplier = f_to_i(10_000f64)?;
2119 let count_part = f_to_i(200_000f64)?;
2120 let count_customer = f_to_i(150_000f64)?;
2121 let count_orders = f_to_i(150_000f64 * 10f64)?;
2122 let count_clerk = f_to_i(1_000f64)?;
2123
2124 LoadGenerator::Tpch {
2125 count_supplier,
2126 count_part,
2127 count_customer,
2128 count_orders,
2129 count_clerk,
2130 }
2131 }
2132 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2133 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2134 let LoadGeneratorOptionExtracted {
2135 keys,
2136 snapshot_rounds,
2137 transactional_snapshot,
2138 value_size,
2139 tick_interval,
2140 seed,
2141 partitions,
2142 batch_size,
2143 ..
2144 } = extracted;
2145
2146 let mut include_offset = None;
2147 for im in include_metadata {
2148 match im {
2149 SourceIncludeMetadata::Offset { alias } => {
2150 include_offset = match alias {
2151 Some(alias) => Some(alias.to_string()),
2152 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2153 }
2154 }
2155 SourceIncludeMetadata::Key { .. } => continue,
2156
2157 _ => {
2158 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2159 }
2160 };
2161 }
2162
2163 let lgkv = KeyValueLoadGenerator {
2164 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2165 snapshot_rounds: snapshot_rounds
2166 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2167 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2169 value_size: value_size
2170 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2171 partitions: partitions
2172 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2173 tick_interval,
2174 batch_size: batch_size
2175 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2176 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2177 include_offset,
2178 };
2179
2180 if lgkv.keys == 0
2181 || lgkv.partitions == 0
2182 || lgkv.value_size == 0
2183 || lgkv.batch_size == 0
2184 {
2185 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2186 }
2187
2188 if lgkv.keys % lgkv.partitions != 0 {
2189 sql_bail!("KEYS must be a multiple of PARTITIONS")
2190 }
2191
2192 if lgkv.batch_size > lgkv.keys {
2193 sql_bail!("KEYS must be larger than BATCH SIZE")
2194 }
2195
2196 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2199 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2200 }
2201
2202 if lgkv.snapshot_rounds == 0 {
2203 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2204 }
2205
2206 LoadGenerator::KeyValue(lgkv)
2207 }
2208 };
2209
2210 Ok(load_generator)
2211}
2212
2213fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2214 let before = value_desc.get_by_name(&"before".into());
2215 let (after_idx, after_ty) = value_desc
2216 .get_by_name(&"after".into())
2217 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2218 let before_idx = if let Some((before_idx, before_ty)) = before {
2219 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2220 sql_bail!("'before' column must be of type record");
2221 }
2222 if before_ty != after_ty {
2223 sql_bail!("'before' type differs from 'after' column");
2224 }
2225 Some(before_idx)
2226 } else {
2227 None
2228 };
2229 Ok((before_idx, after_idx))
2230}
2231
2232fn get_encoding(
2233 scx: &StatementContext,
2234 format: &FormatSpecifier<Aug>,
2235 envelope: &ast::SourceEnvelope,
2236) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2237 let encoding = match format {
2238 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2239 FormatSpecifier::KeyValue { key, value } => {
2240 let key = {
2241 let encoding = get_encoding_inner(scx, key)?;
2242 Some(encoding.key.unwrap_or(encoding.value))
2243 };
2244 let value = get_encoding_inner(scx, value)?.value;
2245 SourceDataEncoding { key, value }
2246 }
2247 };
2248
2249 let requires_keyvalue = matches!(
2250 envelope,
2251 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2252 );
2253 let is_keyvalue = encoding.key.is_some();
2254 if requires_keyvalue && !is_keyvalue {
2255 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2256 };
2257
2258 Ok(encoding)
2259}
2260
2261fn source_sink_cluster_config<'a, 'ctx>(
2267 scx: &'a StatementContext<'ctx>,
2268 in_cluster: &mut Option<ResolvedClusterName>,
2269) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2270 let cluster = match in_cluster {
2271 None => {
2272 let cluster = scx.catalog.resolve_cluster(None)?;
2273 *in_cluster = Some(ResolvedClusterName {
2274 id: cluster.id(),
2275 print_name: None,
2276 });
2277 cluster
2278 }
2279 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2280 };
2281
2282 Ok(cluster)
2283}
2284
2285generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2286
2287#[derive(Debug)]
2288pub struct Schema {
2289 pub key_schema: Option<String>,
2290 pub value_schema: String,
2291 pub key_reference_schemas: Vec<String>,
2293 pub value_reference_schemas: Vec<String>,
2295 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2296 pub confluent_wire_format: bool,
2297}
2298
2299fn get_encoding_inner(
2300 scx: &StatementContext,
2301 format: &Format<Aug>,
2302) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2303 let value = match format {
2304 Format::Bytes => DataEncoding::Bytes,
2305 Format::Avro(schema) => {
2306 let Schema {
2307 key_schema,
2308 value_schema,
2309 key_reference_schemas,
2310 value_reference_schemas,
2311 csr_connection,
2312 confluent_wire_format,
2313 } = match schema {
2314 AvroSchema::InlineSchema {
2317 schema: ast::Schema { schema },
2318 with_options,
2319 } => {
2320 let AvroSchemaOptionExtracted {
2321 confluent_wire_format,
2322 ..
2323 } = with_options.clone().try_into()?;
2324
2325 Schema {
2326 key_schema: None,
2327 value_schema: schema.clone(),
2328 key_reference_schemas: vec![],
2329 value_reference_schemas: vec![],
2330 csr_connection: None,
2331 confluent_wire_format,
2332 }
2333 }
2334 AvroSchema::Csr {
2335 csr_connection:
2336 CsrConnectionAvro {
2337 connection,
2338 seed,
2339 key_strategy: _,
2340 value_strategy: _,
2341 },
2342 } => {
2343 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2344 let csr_connection = match item.connection()? {
2345 Connection::Csr(_) => item.id(),
2346 _ => {
2347 sql_bail!(
2348 "{} is not a schema registry connection",
2349 scx.catalog
2350 .resolve_full_name(item.name())
2351 .to_string()
2352 .quoted()
2353 )
2354 }
2355 };
2356
2357 if let Some(seed) = seed {
2358 Schema {
2359 key_schema: seed.key_schema.clone(),
2360 value_schema: seed.value_schema.clone(),
2361 key_reference_schemas: seed.key_reference_schemas.clone(),
2362 value_reference_schemas: seed.value_reference_schemas.clone(),
2363 csr_connection: Some(csr_connection),
2364 confluent_wire_format: true,
2365 }
2366 } else {
2367 unreachable!("CSR seed resolution should already have been called: Avro")
2368 }
2369 }
2370 };
2371
2372 if let Some(key_schema) = key_schema {
2373 return Ok(SourceDataEncoding {
2374 key: Some(DataEncoding::Avro(AvroEncoding {
2375 schema: key_schema,
2376 reference_schemas: key_reference_schemas,
2377 csr_connection: csr_connection.clone(),
2378 confluent_wire_format,
2379 })),
2380 value: DataEncoding::Avro(AvroEncoding {
2381 schema: value_schema,
2382 reference_schemas: value_reference_schemas,
2383 csr_connection,
2384 confluent_wire_format,
2385 }),
2386 });
2387 } else {
2388 DataEncoding::Avro(AvroEncoding {
2389 schema: value_schema,
2390 reference_schemas: value_reference_schemas,
2391 csr_connection,
2392 confluent_wire_format,
2393 })
2394 }
2395 }
2396 Format::Protobuf(schema) => match schema {
2397 ProtobufSchema::Csr {
2398 csr_connection:
2399 CsrConnectionProtobuf {
2400 connection:
2401 CsrConnection {
2402 connection,
2403 options,
2404 },
2405 seed,
2406 },
2407 } => {
2408 if let Some(CsrSeedProtobuf { key, value }) = seed {
2409 let item = scx.get_item_by_resolved_name(connection)?;
2410 let _ = match item.connection()? {
2411 Connection::Csr(connection) => connection,
2412 _ => {
2413 sql_bail!(
2414 "{} is not a schema registry connection",
2415 scx.catalog
2416 .resolve_full_name(item.name())
2417 .to_string()
2418 .quoted()
2419 )
2420 }
2421 };
2422
2423 if !options.is_empty() {
2424 sql_bail!("Protobuf CSR connections do not support any options");
2425 }
2426
2427 let value = DataEncoding::Protobuf(ProtobufEncoding {
2428 descriptors: strconv::parse_bytes(&value.schema)?,
2429 message_name: value.message_name.clone(),
2430 confluent_wire_format: true,
2431 });
2432 if let Some(key) = key {
2433 return Ok(SourceDataEncoding {
2434 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2435 descriptors: strconv::parse_bytes(&key.schema)?,
2436 message_name: key.message_name.clone(),
2437 confluent_wire_format: true,
2438 })),
2439 value,
2440 });
2441 }
2442 value
2443 } else {
2444 unreachable!("CSR seed resolution should already have been called: Proto")
2445 }
2446 }
2447 ProtobufSchema::InlineSchema {
2448 message_name,
2449 schema: ast::Schema { schema },
2450 } => {
2451 let descriptors = strconv::parse_bytes(schema)?;
2452
2453 DataEncoding::Protobuf(ProtobufEncoding {
2454 descriptors,
2455 message_name: message_name.to_owned(),
2456 confluent_wire_format: false,
2457 })
2458 }
2459 },
2460 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2461 regex: mz_repr::adt::regex::Regex::new(regex, false)
2462 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2463 }),
2464 Format::Csv { columns, delimiter } => {
2465 let columns = match columns {
2466 CsvColumns::Header { names } => {
2467 if names.is_empty() {
2468 sql_bail!("[internal error] column spec should get names in purify")
2469 }
2470 ColumnSpec::Header {
2471 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2472 }
2473 }
2474 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2475 };
2476 DataEncoding::Csv(CsvEncoding {
2477 columns,
2478 delimiter: u8::try_from(*delimiter)
2479 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2480 })
2481 }
2482 Format::Json { array: false } => DataEncoding::Json,
2483 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2484 Format::Text => DataEncoding::Text,
2485 };
2486 Ok(SourceDataEncoding { key: None, value })
2487}
2488
2489fn get_key_envelope(
2491 included_items: &[SourceIncludeMetadata],
2492 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2493 key_envelope_no_encoding: bool,
2494) -> Result<KeyEnvelope, PlanError> {
2495 let key_definition = included_items
2496 .iter()
2497 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2498 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2499 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2500 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2501 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2502 (Some(name), _) if key_envelope_no_encoding => {
2503 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2504 }
2505 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2506 (_, None) => {
2507 sql_bail!(
2511 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2512 got bare FORMAT"
2513 );
2514 }
2515 }
2516 } else {
2517 Ok(KeyEnvelope::None)
2518 }
2519}
2520
2521fn get_unnamed_key_envelope(
2524 key: Option<&DataEncoding<ReferencedConnection>>,
2525) -> Result<KeyEnvelope, PlanError> {
2526 let is_composite = match key {
2530 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2531 Some(
2532 DataEncoding::Avro(_)
2533 | DataEncoding::Csv(_)
2534 | DataEncoding::Protobuf(_)
2535 | DataEncoding::Regex { .. },
2536 ) => true,
2537 None => false,
2538 };
2539
2540 if is_composite {
2541 Ok(KeyEnvelope::Flattened)
2542 } else {
2543 Ok(KeyEnvelope::Named("key".to_string()))
2544 }
2545}
2546
2547pub fn describe_create_view(
2548 _: &StatementContext,
2549 _: CreateViewStatement<Aug>,
2550) -> Result<StatementDesc, PlanError> {
2551 Ok(StatementDesc::new(None))
2552}
2553
2554pub fn plan_view(
2555 scx: &StatementContext,
2556 def: &mut ViewDefinition<Aug>,
2557 temporary: bool,
2558) -> Result<(QualifiedItemName, View), PlanError> {
2559 let create_sql = normalize::create_statement(
2560 scx,
2561 Statement::CreateView(CreateViewStatement {
2562 if_exists: IfExistsBehavior::Error,
2563 temporary,
2564 definition: def.clone(),
2565 }),
2566 )?;
2567
2568 let ViewDefinition {
2569 name,
2570 columns,
2571 query,
2572 } = def;
2573
2574 let query::PlannedRootQuery {
2575 expr,
2576 mut desc,
2577 finishing,
2578 scope: _,
2579 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2580 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2586 &finishing,
2587 expr.arity()
2588 ));
2589 if expr.contains_parameters()? {
2590 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2591 }
2592
2593 let dependencies = expr
2594 .depends_on()
2595 .into_iter()
2596 .map(|gid| scx.catalog.resolve_item_id(&gid))
2597 .collect();
2598
2599 let name = if temporary {
2600 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2601 } else {
2602 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2603 };
2604
2605 plan_utils::maybe_rename_columns(
2606 format!("view {}", scx.catalog.resolve_full_name(&name)),
2607 &mut desc,
2608 columns,
2609 )?;
2610 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2611
2612 if let Some(dup) = names.iter().duplicates().next() {
2613 sql_bail!("column {} specified more than once", dup.quoted());
2614 }
2615
2616 let view = View {
2617 create_sql,
2618 expr,
2619 dependencies,
2620 column_names: names,
2621 temporary,
2622 };
2623
2624 Ok((name, view))
2625}
2626
2627pub fn plan_create_view(
2628 scx: &StatementContext,
2629 mut stmt: CreateViewStatement<Aug>,
2630) -> Result<Plan, PlanError> {
2631 let CreateViewStatement {
2632 temporary,
2633 if_exists,
2634 definition,
2635 } = &mut stmt;
2636 let (name, view) = plan_view(scx, definition, *temporary)?;
2637
2638 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2641
2642 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2643 let if_exists = true;
2644 let cascade = false;
2645 let maybe_item_to_drop = plan_drop_item(
2646 scx,
2647 ObjectType::View,
2648 if_exists,
2649 definition.name.clone(),
2650 cascade,
2651 )?;
2652
2653 if let Some(id) = maybe_item_to_drop {
2655 let dependencies = view.expr.depends_on();
2656 let invalid_drop = scx
2657 .get_item(&id)
2658 .global_ids()
2659 .any(|gid| dependencies.contains(&gid));
2660 if invalid_drop {
2661 let item = scx.catalog.get_item(&id);
2662 sql_bail!(
2663 "cannot replace view {0}: depended upon by new {0} definition",
2664 scx.catalog.resolve_full_name(item.name())
2665 );
2666 }
2667
2668 Some(id)
2669 } else {
2670 None
2671 }
2672 } else {
2673 None
2674 };
2675 let drop_ids = replace
2676 .map(|id| {
2677 scx.catalog
2678 .item_dependents(id)
2679 .into_iter()
2680 .map(|id| id.unwrap_item_id())
2681 .collect()
2682 })
2683 .unwrap_or_default();
2684
2685 validate_view_dependencies(scx, &view.dependencies.0)?;
2686
2687 let full_name = scx.catalog.resolve_full_name(&name);
2689 let partial_name = PartialItemName::from(full_name.clone());
2690 if let (Ok(item), IfExistsBehavior::Error, false) = (
2693 scx.catalog.resolve_item_or_type(&partial_name),
2694 *if_exists,
2695 ignore_if_exists_errors,
2696 ) {
2697 return Err(PlanError::ItemAlreadyExists {
2698 name: full_name.to_string(),
2699 item_type: item.item_type(),
2700 });
2701 }
2702
2703 Ok(Plan::CreateView(CreateViewPlan {
2704 name,
2705 view,
2706 replace,
2707 drop_ids,
2708 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2709 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2710 }))
2711}
2712
2713fn validate_view_dependencies(
2715 scx: &StatementContext,
2716 dependencies: &BTreeSet<CatalogItemId>,
2717) -> Result<(), PlanError> {
2718 for id in dependencies {
2719 let item = scx.catalog.get_item(id);
2720 if item.replacement_target().is_some() {
2721 let name = scx.catalog.minimal_qualification(item.name());
2722 return Err(PlanError::InvalidDependency {
2723 name: name.to_string(),
2724 item_type: format!("replacement {}", item.item_type()),
2725 });
2726 }
2727 }
2728
2729 Ok(())
2730}
2731
2732pub fn describe_create_materialized_view(
2733 _: &StatementContext,
2734 _: CreateMaterializedViewStatement<Aug>,
2735) -> Result<StatementDesc, PlanError> {
2736 Ok(StatementDesc::new(None))
2737}
2738
2739pub fn describe_create_continual_task(
2740 _: &StatementContext,
2741 _: CreateContinualTaskStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743 Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_create_network_policy(
2747 _: &StatementContext,
2748 _: CreateNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750 Ok(StatementDesc::new(None))
2751}
2752
2753pub fn describe_alter_network_policy(
2754 _: &StatementContext,
2755 _: AlterNetworkPolicyStatement<Aug>,
2756) -> Result<StatementDesc, PlanError> {
2757 Ok(StatementDesc::new(None))
2758}
2759
2760pub fn plan_create_materialized_view(
2761 scx: &StatementContext,
2762 mut stmt: CreateMaterializedViewStatement<Aug>,
2763) -> Result<Plan, PlanError> {
2764 let cluster_id =
2765 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2766 stmt.in_cluster = Some(ResolvedClusterName {
2767 id: cluster_id,
2768 print_name: None,
2769 });
2770
2771 let target_replica = match &stmt.in_cluster_replica {
2772 Some(replica_name) => {
2773 scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2774
2775 let cluster = scx.catalog.get_cluster(cluster_id);
2776 let replica_id = cluster
2777 .replica_ids()
2778 .get(replica_name.as_str())
2779 .copied()
2780 .ok_or_else(|| {
2781 CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2782 })?;
2783 Some(replica_id)
2784 }
2785 None => None,
2786 };
2787
2788 let create_sql =
2789 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2790
2791 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2792 let name = scx.allocate_qualified_name(partial_name.clone())?;
2793
2794 let query::PlannedRootQuery {
2795 expr,
2796 mut desc,
2797 finishing,
2798 scope: _,
2799 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2800 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2802 &finishing,
2803 expr.arity()
2804 ));
2805 if expr.contains_parameters()? {
2806 return Err(PlanError::ParameterNotAllowed(
2807 "materialized views".to_string(),
2808 ));
2809 }
2810
2811 plan_utils::maybe_rename_columns(
2812 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2813 &mut desc,
2814 &stmt.columns,
2815 )?;
2816 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2817
2818 let MaterializedViewOptionExtracted {
2819 assert_not_null,
2820 partition_by,
2821 retain_history,
2822 refresh,
2823 seen: _,
2824 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2825
2826 if let Some(partition_by) = partition_by {
2827 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2828 check_partition_by(&desc, partition_by)?;
2829 }
2830
2831 let refresh_schedule = {
2832 let mut refresh_schedule = RefreshSchedule::default();
2833 let mut on_commits_seen = 0;
2834 for refresh_option_value in refresh {
2835 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2836 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2837 }
2838 match refresh_option_value {
2839 RefreshOptionValue::OnCommit => {
2840 on_commits_seen += 1;
2841 }
2842 RefreshOptionValue::AtCreation => {
2843 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2844 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2845 }
2846 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2847 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2849 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2850 name: "REFRESH AT",
2851 scope: &Scope::empty(),
2852 relation_type: &SqlRelationType::empty(),
2853 allow_aggregates: false,
2854 allow_subqueries: false,
2855 allow_parameters: false,
2856 allow_windows: false,
2857 };
2858 let hir = plan_expr(ecx, &time)?.cast_to(
2859 ecx,
2860 CastContext::Assignment,
2861 &SqlScalarType::MzTimestamp,
2862 )?;
2863 let timestamp = hir
2865 .into_literal_mz_timestamp()
2866 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2867 refresh_schedule.ats.push(timestamp);
2868 }
2869 RefreshOptionValue::Every(RefreshEveryOptionValue {
2870 interval,
2871 aligned_to,
2872 }) => {
2873 let interval = Interval::try_from_value(Value::Interval(interval))?;
2874 if interval.as_microseconds() <= 0 {
2875 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2876 }
2877 if interval.months != 0 {
2878 sql_bail!("REFRESH interval must not involve units larger than days");
2883 }
2884 let interval = interval.duration()?;
2885 if u64::try_from(interval.as_millis()).is_err() {
2886 sql_bail!("REFRESH interval too large");
2887 }
2888
2889 let mut aligned_to = match aligned_to {
2890 Some(aligned_to) => aligned_to,
2891 None => {
2892 soft_panic_or_log!(
2893 "ALIGNED TO should have been filled in by purification"
2894 );
2895 sql_bail!(
2896 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2897 )
2898 }
2899 };
2900
2901 transform_ast::transform(scx, &mut aligned_to)?;
2903
2904 let ecx = &ExprContext {
2905 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2906 name: "REFRESH EVERY ... ALIGNED TO",
2907 scope: &Scope::empty(),
2908 relation_type: &SqlRelationType::empty(),
2909 allow_aggregates: false,
2910 allow_subqueries: false,
2911 allow_parameters: false,
2912 allow_windows: false,
2913 };
2914 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2915 ecx,
2916 CastContext::Assignment,
2917 &SqlScalarType::MzTimestamp,
2918 )?;
2919 let aligned_to_const = aligned_to_hir
2921 .into_literal_mz_timestamp()
2922 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2923
2924 refresh_schedule.everies.push(RefreshEvery {
2925 interval,
2926 aligned_to: aligned_to_const,
2927 });
2928 }
2929 }
2930 }
2931
2932 if on_commits_seen > 1 {
2933 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2934 }
2935 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2936 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2937 }
2938
2939 if refresh_schedule == RefreshSchedule::default() {
2940 None
2941 } else {
2942 Some(refresh_schedule)
2943 }
2944 };
2945
2946 let as_of = stmt.as_of.map(Timestamp::from);
2947 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2948 let mut non_null_assertions = assert_not_null
2949 .into_iter()
2950 .map(normalize::column_name)
2951 .map(|assertion_name| {
2952 column_names
2953 .iter()
2954 .position(|col| col == &assertion_name)
2955 .ok_or_else(|| {
2956 sql_err!(
2957 "column {} in ASSERT NOT NULL option not found",
2958 assertion_name.quoted()
2959 )
2960 })
2961 })
2962 .collect::<Result<Vec<_>, _>>()?;
2963 non_null_assertions.sort();
2964 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2965 let dup = &column_names[*dup];
2966 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2967 }
2968
2969 if let Some(dup) = column_names.iter().duplicates().next() {
2970 sql_bail!("column {} specified more than once", dup.quoted());
2971 }
2972
2973 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2976 Ok(true) => IfExistsBehavior::Skip,
2977 _ => stmt.if_exists,
2978 };
2979
2980 let mut replace = None;
2981 let mut if_not_exists = false;
2982 match if_exists {
2983 IfExistsBehavior::Replace => {
2984 let if_exists = true;
2985 let cascade = false;
2986 let replace_id = plan_drop_item(
2987 scx,
2988 ObjectType::MaterializedView,
2989 if_exists,
2990 partial_name.clone().into(),
2991 cascade,
2992 )?;
2993
2994 if let Some(id) = replace_id {
2996 let dependencies = expr.depends_on();
2997 let invalid_drop = scx
2998 .get_item(&id)
2999 .global_ids()
3000 .any(|gid| dependencies.contains(&gid));
3001 if invalid_drop {
3002 let item = scx.catalog.get_item(&id);
3003 sql_bail!(
3004 "cannot replace materialized view {0}: depended upon by new {0} definition",
3005 scx.catalog.resolve_full_name(item.name())
3006 );
3007 }
3008 replace = Some(id);
3009 }
3010 }
3011 IfExistsBehavior::Skip => if_not_exists = true,
3012 IfExistsBehavior::Error => (),
3013 }
3014 let drop_ids = replace
3015 .map(|id| {
3016 scx.catalog
3017 .item_dependents(id)
3018 .into_iter()
3019 .map(|id| id.unwrap_item_id())
3020 .collect()
3021 })
3022 .unwrap_or_default();
3023 let mut dependencies: BTreeSet<_> = expr
3024 .depends_on()
3025 .into_iter()
3026 .map(|gid| scx.catalog.resolve_item_id(&gid))
3027 .collect();
3028
3029 let mut replacement_target = None;
3031 if let Some(target_name) = &stmt.replacement_for {
3032 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3033
3034 let target = scx.get_item_by_resolved_name(target_name)?;
3035 if target.item_type() != CatalogItemType::MaterializedView {
3036 return Err(PlanError::InvalidReplacement {
3037 item_type: target.item_type(),
3038 item_name: scx.catalog.minimal_qualification(target.name()),
3039 replacement_type: CatalogItemType::MaterializedView,
3040 replacement_name: partial_name,
3041 });
3042 }
3043 if target.id().is_system() {
3044 sql_bail!(
3045 "cannot replace {} because it is required by the database system",
3046 scx.catalog.minimal_qualification(target.name()),
3047 );
3048 }
3049
3050 for dependent in scx.catalog.item_dependents(target.id()) {
3052 if let ObjectId::Item(id) = dependent
3053 && dependencies.contains(&id)
3054 {
3055 sql_bail!(
3056 "replacement would cause {} to depend on itself",
3057 scx.catalog.minimal_qualification(target.name()),
3058 );
3059 }
3060 }
3061
3062 dependencies.insert(target.id());
3063
3064 for use_id in target.used_by() {
3065 let use_item = scx.get_item(use_id);
3066 if use_item.replacement_target() == Some(target.id()) {
3067 sql_bail!(
3068 "cannot replace {} because it already has a replacement: {}",
3069 scx.catalog.minimal_qualification(target.name()),
3070 scx.catalog.minimal_qualification(use_item.name()),
3071 );
3072 }
3073 }
3074
3075 replacement_target = Some(target.id());
3076 }
3077
3078 validate_view_dependencies(scx, &dependencies)?;
3079
3080 let full_name = scx.catalog.resolve_full_name(&name);
3082 let partial_name = PartialItemName::from(full_name.clone());
3083 if let (IfExistsBehavior::Error, Ok(item)) =
3086 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3087 {
3088 return Err(PlanError::ItemAlreadyExists {
3089 name: full_name.to_string(),
3090 item_type: item.item_type(),
3091 });
3092 }
3093
3094 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3095 name,
3096 materialized_view: MaterializedView {
3097 create_sql,
3098 expr,
3099 dependencies: DependencyIds(dependencies),
3100 column_names,
3101 replacement_target,
3102 cluster_id,
3103 target_replica,
3104 non_null_assertions,
3105 compaction_window,
3106 refresh_schedule,
3107 as_of,
3108 },
3109 replace,
3110 drop_ids,
3111 if_not_exists,
3112 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3113 }))
3114}
3115
3116generate_extracted_config!(
3117 MaterializedViewOption,
3118 (AssertNotNull, Ident, AllowMultiple),
3119 (PartitionBy, Vec<Ident>),
3120 (RetainHistory, OptionalDuration),
3121 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3122);
3123
3124pub fn plan_create_continual_task(
3125 scx: &StatementContext,
3126 mut stmt: CreateContinualTaskStatement<Aug>,
3127) -> Result<Plan, PlanError> {
3128 match &stmt.sugar {
3129 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3130 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3131 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3132 }
3133 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3134 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3135 }
3136 };
3137 let cluster_id = match &stmt.in_cluster {
3138 None => scx.catalog.resolve_cluster(None)?.id(),
3139 Some(in_cluster) => in_cluster.id,
3140 };
3141 stmt.in_cluster = Some(ResolvedClusterName {
3142 id: cluster_id,
3143 print_name: None,
3144 });
3145
3146 let create_sql =
3147 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3148
3149 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3150
3151 let mut desc = match stmt.columns {
3156 None => None,
3157 Some(columns) => {
3158 let mut desc_columns = Vec::with_capacity(columns.capacity());
3159 for col in columns.iter() {
3160 desc_columns.push((
3161 normalize::column_name(col.name.clone()),
3162 SqlColumnType {
3163 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3164 nullable: false,
3165 },
3166 ));
3167 }
3168 Some(RelationDesc::from_names_and_types(desc_columns))
3169 }
3170 };
3171 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3172 match input.item_type() {
3173 CatalogItemType::ContinualTask
3176 | CatalogItemType::Table
3177 | CatalogItemType::MaterializedView
3178 | CatalogItemType::Source => {}
3179 CatalogItemType::Sink
3180 | CatalogItemType::View
3181 | CatalogItemType::Index
3182 | CatalogItemType::Type
3183 | CatalogItemType::Func
3184 | CatalogItemType::Secret
3185 | CatalogItemType::Connection => {
3186 sql_bail!(
3187 "CONTINUAL TASK cannot use {} as an input",
3188 input.item_type()
3189 );
3190 }
3191 }
3192
3193 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3194 let ct_name = stmt.name;
3195 let placeholder_id = match &ct_name {
3196 ResolvedItemName::ContinualTask { id, name } => {
3197 let desc = match desc.as_ref().cloned() {
3198 Some(x) => x,
3199 None => {
3200 let desc = input.relation_desc().expect("item type checked above");
3205 desc.into_owned()
3206 }
3207 };
3208 qcx.ctes.insert(
3209 *id,
3210 CteDesc {
3211 name: name.item.clone(),
3212 desc,
3213 },
3214 );
3215 Some(*id)
3216 }
3217 _ => None,
3218 };
3219
3220 let mut exprs = Vec::new();
3221 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3222 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3223 let query::PlannedRootQuery {
3224 expr,
3225 desc: desc_query,
3226 finishing,
3227 scope: _,
3228 } = query::plan_ct_query(&mut qcx, query)?;
3229 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3232 &finishing,
3233 expr.arity()
3234 ));
3235 if expr.contains_parameters()? {
3236 if expr.contains_parameters()? {
3237 return Err(PlanError::ParameterNotAllowed(
3238 "continual tasks".to_string(),
3239 ));
3240 }
3241 }
3242 let expr = match desc.as_mut() {
3243 None => {
3244 desc = Some(desc_query);
3245 expr
3246 }
3247 Some(desc) => {
3248 if desc_query.arity() > desc.arity() {
3251 sql_bail!(
3252 "statement {}: INSERT has more expressions than target columns",
3253 idx
3254 );
3255 }
3256 if desc_query.arity() < desc.arity() {
3257 sql_bail!(
3258 "statement {}: INSERT has more target columns than expressions",
3259 idx
3260 );
3261 }
3262 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3265 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3266 let expr = expr.map_err(|e| {
3267 sql_err!(
3268 "statement {}: column {} is of type {} but expression is of type {}",
3269 idx,
3270 desc.get_name(e.column).quoted(),
3271 qcx.humanize_scalar_type(&e.target_type, false),
3272 qcx.humanize_scalar_type(&e.source_type, false),
3273 )
3274 })?;
3275
3276 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3279 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3280 if updated {
3281 let new_types = zip_types().map(|(ct, q)| {
3282 let mut ct = ct.clone();
3283 if q.nullable {
3284 ct.nullable = true;
3285 }
3286 ct
3287 });
3288 *desc = RelationDesc::from_names_and_types(
3289 desc.iter_names().cloned().zip_eq(new_types),
3290 );
3291 }
3292
3293 expr
3294 }
3295 };
3296 match stmt {
3297 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3298 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3299 }
3300 }
3301 let expr = exprs
3304 .into_iter()
3305 .reduce(|acc, expr| acc.union(expr))
3306 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3307 let dependencies = expr
3308 .depends_on()
3309 .into_iter()
3310 .map(|gid| scx.catalog.resolve_item_id(&gid))
3311 .collect();
3312
3313 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3314 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3315 if let Some(dup) = column_names.iter().duplicates().next() {
3316 sql_bail!("column {} specified more than once", dup.quoted());
3317 }
3318
3319 let name = match &ct_name {
3321 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3322 ResolvedItemName::ContinualTask { name, .. } => {
3323 let name = scx.allocate_qualified_name(name.clone())?;
3324 let full_name = scx.catalog.resolve_full_name(&name);
3325 let partial_name = PartialItemName::from(full_name.clone());
3326 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3329 return Err(PlanError::ItemAlreadyExists {
3330 name: full_name.to_string(),
3331 item_type: item.item_type(),
3332 });
3333 }
3334 name
3335 }
3336 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3337 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3338 };
3339
3340 let as_of = stmt.as_of.map(Timestamp::from);
3341 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3342 name,
3343 placeholder_id,
3344 desc,
3345 input_id: input.global_id(),
3346 with_snapshot: snapshot.unwrap_or(true),
3347 continual_task: MaterializedView {
3348 create_sql,
3349 expr,
3350 dependencies,
3351 column_names,
3352 replacement_target: None,
3353 cluster_id,
3354 target_replica: None,
3355 non_null_assertions: Vec::new(),
3356 compaction_window: None,
3357 refresh_schedule: None,
3358 as_of,
3359 },
3360 }))
3361}
3362
3363fn continual_task_query<'a>(
3364 ct_name: &ResolvedItemName,
3365 stmt: &'a ast::ContinualTaskStmt<Aug>,
3366) -> Option<ast::Query<Aug>> {
3367 match stmt {
3368 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3369 table_name: _,
3370 columns,
3371 source,
3372 returning,
3373 }) => {
3374 if !columns.is_empty() || !returning.is_empty() {
3375 return None;
3376 }
3377 match source {
3378 ast::InsertSource::Query(query) => Some(query.clone()),
3379 ast::InsertSource::DefaultValues => None,
3380 }
3381 }
3382 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3383 table_name: _,
3384 alias,
3385 using,
3386 selection,
3387 }) => {
3388 if !using.is_empty() {
3389 return None;
3390 }
3391 let from = ast::TableWithJoins {
3393 relation: ast::TableFactor::Table {
3394 name: ct_name.clone(),
3395 alias: alias.clone(),
3396 },
3397 joins: Vec::new(),
3398 };
3399 let select = ast::Select {
3400 from: vec![from],
3401 selection: selection.clone(),
3402 distinct: None,
3403 projection: vec![ast::SelectItem::Wildcard],
3404 group_by: Vec::new(),
3405 having: None,
3406 qualify: None,
3407 options: Vec::new(),
3408 };
3409 let query = ast::Query {
3410 ctes: ast::CteBlock::Simple(Vec::new()),
3411 body: ast::SetExpr::Select(Box::new(select)),
3412 order_by: Vec::new(),
3413 limit: None,
3414 offset: None,
3415 };
3416 Some(query)
3418 }
3419 }
3420}
3421
3422generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3423
3424pub fn describe_create_sink(
3425 _: &StatementContext,
3426 _: CreateSinkStatement<Aug>,
3427) -> Result<StatementDesc, PlanError> {
3428 Ok(StatementDesc::new(None))
3429}
3430
3431generate_extracted_config!(
3432 CreateSinkOption,
3433 (Snapshot, bool),
3434 (PartitionStrategy, String),
3435 (Version, u64),
3436 (CommitInterval, Duration)
3437);
3438
3439pub fn plan_create_sink(
3440 scx: &StatementContext,
3441 stmt: CreateSinkStatement<Aug>,
3442) -> Result<Plan, PlanError> {
3443 let Some(name) = stmt.name.clone() else {
3445 return Err(PlanError::MissingName(CatalogItemType::Sink));
3446 };
3447 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3448 let full_name = scx.catalog.resolve_full_name(&name);
3449 let partial_name = PartialItemName::from(full_name.clone());
3450 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3451 return Err(PlanError::ItemAlreadyExists {
3452 name: full_name.to_string(),
3453 item_type: item.item_type(),
3454 });
3455 }
3456
3457 plan_sink(scx, stmt)
3458}
3459
3460fn plan_sink(
3465 scx: &StatementContext,
3466 mut stmt: CreateSinkStatement<Aug>,
3467) -> Result<Plan, PlanError> {
3468 let CreateSinkStatement {
3469 name,
3470 in_cluster: _,
3471 from,
3472 connection,
3473 format,
3474 envelope,
3475 mode,
3476 if_not_exists,
3477 with_options,
3478 } = stmt.clone();
3479
3480 let Some(name) = name else {
3481 return Err(PlanError::MissingName(CatalogItemType::Sink));
3482 };
3483 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3484
3485 let envelope = match (&connection, envelope, mode) {
3486 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3488 SinkEnvelope::Upsert
3489 }
3490 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3491 SinkEnvelope::Debezium
3492 }
3493 (CreateSinkConnection::Kafka { .. }, None, None) => {
3494 sql_bail!("ENVELOPE clause is required")
3495 }
3496 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3497 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3498 }
3499 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3501 SinkEnvelope::Upsert
3502 }
3503 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3504 sql_bail!("MODE clause is required")
3505 }
3506 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3507 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3508 }
3509 };
3510
3511 let from_name = &from;
3512 let from = scx.get_item_by_resolved_name(&from)?;
3513
3514 {
3515 use CatalogItemType::*;
3516 match from.item_type() {
3517 Table | Source | MaterializedView | ContinualTask => {
3518 if from.replacement_target().is_some() {
3519 let name = scx.catalog.minimal_qualification(from.name());
3520 return Err(PlanError::InvalidSinkFrom {
3521 name: name.to_string(),
3522 item_type: format!("replacement {}", from.item_type()),
3523 });
3524 }
3525 }
3526 Sink | View | Index | Type | Func | Secret | Connection => {
3527 let name = scx.catalog.minimal_qualification(from.name());
3528 return Err(PlanError::InvalidSinkFrom {
3529 name: name.to_string(),
3530 item_type: from.item_type().to_string(),
3531 });
3532 }
3533 }
3534 }
3535
3536 if from.id().is_system() {
3537 bail_unsupported!("creating a sink directly on a catalog object");
3538 }
3539
3540 let desc = from.relation_desc().expect("item type checked above");
3541 let key_indices = match &connection {
3542 CreateSinkConnection::Kafka { key: Some(key), .. }
3543 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3544 let key_columns = key
3545 .key_columns
3546 .clone()
3547 .into_iter()
3548 .map(normalize::column_name)
3549 .collect::<Vec<_>>();
3550 let mut uniq = BTreeSet::new();
3551 for col in key_columns.iter() {
3552 if !uniq.insert(col) {
3553 sql_bail!("duplicate column referenced in KEY: {}", col);
3554 }
3555 }
3556 let indices = key_columns
3557 .iter()
3558 .map(|col| -> anyhow::Result<usize> {
3559 let name_idx =
3560 desc.get_by_name(col)
3561 .map(|(idx, _type)| idx)
3562 .ok_or_else(|| {
3563 sql_err!("column referenced in KEY does not exist: {}", col)
3564 })?;
3565 if desc.get_unambiguous_name(name_idx).is_none() {
3566 sql_err!("column referenced in KEY is ambiguous: {}", col);
3567 }
3568 Ok(name_idx)
3569 })
3570 .collect::<Result<Vec<_>, _>>()?;
3571 let is_valid_key = desc
3572 .typ()
3573 .keys
3574 .iter()
3575 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3576
3577 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3578 if key.not_enforced {
3579 scx.catalog
3580 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3581 key: key_columns.clone(),
3582 name: name.item.clone(),
3583 })
3584 } else {
3585 return Err(PlanError::UpsertSinkWithInvalidKey {
3586 name: from_name.full_name_str(),
3587 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3588 valid_keys: desc
3589 .typ()
3590 .keys
3591 .iter()
3592 .map(|key| {
3593 key.iter()
3594 .map(|col| desc.get_name(*col).as_str().into())
3595 .collect()
3596 })
3597 .collect(),
3598 });
3599 }
3600 }
3601 Some(indices)
3602 }
3603 CreateSinkConnection::Kafka { key: None, .. }
3604 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3605 };
3606
3607 let headers_index = match &connection {
3608 CreateSinkConnection::Kafka {
3609 headers: Some(headers),
3610 ..
3611 } => {
3612 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3613
3614 match envelope {
3615 SinkEnvelope::Upsert => (),
3616 SinkEnvelope::Debezium => {
3617 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3618 }
3619 };
3620
3621 let headers = normalize::column_name(headers.clone());
3622 let (idx, ty) = desc
3623 .get_by_name(&headers)
3624 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3625
3626 if desc.get_unambiguous_name(idx).is_none() {
3627 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3628 }
3629
3630 match &ty.scalar_type {
3631 SqlScalarType::Map { value_type, .. }
3632 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3633 _ => sql_bail!(
3634 "HEADERS column must have type map[text => text] or map[text => bytea]"
3635 ),
3636 }
3637
3638 Some(idx)
3639 }
3640 _ => None,
3641 };
3642
3643 let relation_key_indices = desc.typ().keys.get(0).cloned();
3645
3646 let key_desc_and_indices = key_indices.map(|key_indices| {
3647 let cols = desc
3648 .iter()
3649 .map(|(name, ty)| (name.clone(), ty.clone()))
3650 .collect::<Vec<_>>();
3651 let (names, types): (Vec<_>, Vec<_>) =
3652 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3653 let typ = SqlRelationType::new(types);
3654 (RelationDesc::new(typ, names), key_indices)
3655 });
3656
3657 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3658 return Err(PlanError::UpsertSinkWithoutKey);
3659 }
3660
3661 let CreateSinkOptionExtracted {
3662 snapshot,
3663 version,
3664 partition_strategy: _,
3665 seen: _,
3666 commit_interval,
3667 } = with_options.try_into()?;
3668
3669 let connection_builder = match connection {
3670 CreateSinkConnection::Kafka {
3671 connection,
3672 options,
3673 ..
3674 } => kafka_sink_builder(
3675 scx,
3676 connection,
3677 options,
3678 format,
3679 relation_key_indices,
3680 key_desc_and_indices,
3681 headers_index,
3682 desc.into_owned(),
3683 envelope,
3684 from.id(),
3685 commit_interval,
3686 )?,
3687 CreateSinkConnection::Iceberg {
3688 connection,
3689 aws_connection,
3690 options,
3691 ..
3692 } => iceberg_sink_builder(
3693 scx,
3694 connection,
3695 aws_connection,
3696 options,
3697 relation_key_indices,
3698 key_desc_and_indices,
3699 commit_interval,
3700 )?,
3701 };
3702
3703 let with_snapshot = snapshot.unwrap_or(true);
3705 let version = version.unwrap_or(0);
3707
3708 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3712 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3713
3714 Ok(Plan::CreateSink(CreateSinkPlan {
3715 name,
3716 sink: Sink {
3717 create_sql,
3718 from: from.global_id(),
3719 connection: connection_builder,
3720 envelope,
3721 version,
3722 commit_interval,
3723 },
3724 with_snapshot,
3725 if_not_exists,
3726 in_cluster: in_cluster.id(),
3727 }))
3728}
3729
3730fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3731 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3732
3733 let existing_keys = desc
3734 .typ()
3735 .keys
3736 .iter()
3737 .map(|key_columns| {
3738 key_columns
3739 .iter()
3740 .map(|col| desc.get_name(*col).as_str())
3741 .join(", ")
3742 })
3743 .join(", ");
3744
3745 sql_err!(
3746 "Key constraint ({}) conflicts with existing key ({})",
3747 user_keys,
3748 existing_keys
3749 )
3750}
3751
3752#[derive(Debug, Default, PartialEq, Clone)]
3755pub struct CsrConfigOptionExtracted {
3756 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3757 pub(crate) avro_key_fullname: Option<String>,
3758 pub(crate) avro_value_fullname: Option<String>,
3759 pub(crate) null_defaults: bool,
3760 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3761 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3762 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3763 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3764}
3765
3766impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3767 type Error = crate::plan::PlanError;
3768 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3769 let mut extracted = CsrConfigOptionExtracted::default();
3770 let mut common_doc_comments = BTreeMap::new();
3771 for option in v {
3772 if !extracted.seen.insert(option.name.clone()) {
3773 return Err(PlanError::Unstructured({
3774 format!("{} specified more than once", option.name)
3775 }));
3776 }
3777 let option_name = option.name.clone();
3778 let option_name_str = option_name.to_ast_string_simple();
3779 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3780 option_name: option_name.to_ast_string_simple(),
3781 err: e.into(),
3782 };
3783 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3784 val.map(|s| match s {
3785 WithOptionValue::Value(Value::String(s)) => {
3786 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3787 }
3788 _ => Err("must be a string".to_string()),
3789 })
3790 .transpose()
3791 .map_err(PlanError::Unstructured)
3792 .map_err(better_error)
3793 };
3794 match option.name {
3795 CsrConfigOptionName::AvroKeyFullname => {
3796 extracted.avro_key_fullname =
3797 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3798 }
3799 CsrConfigOptionName::AvroValueFullname => {
3800 extracted.avro_value_fullname =
3801 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3802 }
3803 CsrConfigOptionName::NullDefaults => {
3804 extracted.null_defaults =
3805 <bool>::try_from_value(option.value).map_err(better_error)?;
3806 }
3807 CsrConfigOptionName::AvroDocOn(doc_on) => {
3808 let value = String::try_from_value(option.value.ok_or_else(|| {
3809 PlanError::InvalidOptionValue {
3810 option_name: option_name_str,
3811 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3812 }
3813 })?)
3814 .map_err(better_error)?;
3815 let key = match doc_on.identifier {
3816 DocOnIdentifier::Column(ast::ColumnName {
3817 relation: ResolvedItemName::Item { id, .. },
3818 column: ResolvedColumnReference::Column { name, index: _ },
3819 }) => DocTarget::Field {
3820 object_id: id,
3821 column_name: name,
3822 },
3823 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3824 DocTarget::Type(id)
3825 }
3826 _ => unreachable!(),
3827 };
3828
3829 match doc_on.for_schema {
3830 DocOnSchema::KeyOnly => {
3831 extracted.key_doc_options.insert(key, value);
3832 }
3833 DocOnSchema::ValueOnly => {
3834 extracted.value_doc_options.insert(key, value);
3835 }
3836 DocOnSchema::All => {
3837 common_doc_comments.insert(key, value);
3838 }
3839 }
3840 }
3841 CsrConfigOptionName::KeyCompatibilityLevel => {
3842 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3843 }
3844 CsrConfigOptionName::ValueCompatibilityLevel => {
3845 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3846 }
3847 }
3848 }
3849
3850 for (key, value) in common_doc_comments {
3851 if !extracted.key_doc_options.contains_key(&key) {
3852 extracted.key_doc_options.insert(key.clone(), value.clone());
3853 }
3854 if !extracted.value_doc_options.contains_key(&key) {
3855 extracted.value_doc_options.insert(key, value);
3856 }
3857 }
3858 Ok(extracted)
3859 }
3860}
3861
3862fn iceberg_sink_builder(
3863 scx: &StatementContext,
3864 catalog_connection: ResolvedItemName,
3865 aws_connection: ResolvedItemName,
3866 options: Vec<IcebergSinkConfigOption<Aug>>,
3867 relation_key_indices: Option<Vec<usize>>,
3868 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3869 commit_interval: Option<Duration>,
3870) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3871 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3872 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3873 let catalog_connection_id = catalog_connection_item.id();
3874 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3875 let aws_connection_id = aws_connection_item.id();
3876 if !matches!(
3877 catalog_connection_item.connection()?,
3878 Connection::IcebergCatalog(_)
3879 ) {
3880 sql_bail!(
3881 "{} is not an iceberg catalog connection",
3882 scx.catalog
3883 .resolve_full_name(catalog_connection_item.name())
3884 .to_string()
3885 .quoted()
3886 );
3887 };
3888
3889 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3890 sql_bail!(
3891 "{} is not an AWS connection",
3892 scx.catalog
3893 .resolve_full_name(aws_connection_item.name())
3894 .to_string()
3895 .quoted()
3896 );
3897 }
3898
3899 let IcebergSinkConfigOptionExtracted {
3900 table,
3901 namespace,
3902 seen: _,
3903 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3904
3905 let Some(table) = table else {
3906 sql_bail!("Iceberg sink must specify TABLE");
3907 };
3908 let Some(namespace) = namespace else {
3909 sql_bail!("Iceberg sink must specify NAMESPACE");
3910 };
3911 if commit_interval.is_none() {
3912 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3913 }
3914
3915 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3916 catalog_connection_id,
3917 catalog_connection: catalog_connection_id,
3918 aws_connection_id,
3919 aws_connection: aws_connection_id,
3920 table,
3921 namespace,
3922 relation_key_indices,
3923 key_desc_and_indices,
3924 }))
3925}
3926
3927fn kafka_sink_builder(
3928 scx: &StatementContext,
3929 connection: ResolvedItemName,
3930 options: Vec<KafkaSinkConfigOption<Aug>>,
3931 format: Option<FormatSpecifier<Aug>>,
3932 relation_key_indices: Option<Vec<usize>>,
3933 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3934 headers_index: Option<usize>,
3935 value_desc: RelationDesc,
3936 envelope: SinkEnvelope,
3937 sink_from: CatalogItemId,
3938 commit_interval: Option<Duration>,
3939) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3940 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3942 let connection_id = connection_item.id();
3943 match connection_item.connection()? {
3944 Connection::Kafka(_) => (),
3945 _ => sql_bail!(
3946 "{} is not a kafka connection",
3947 scx.catalog.resolve_full_name(connection_item.name())
3948 ),
3949 };
3950
3951 if commit_interval.is_some() {
3952 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3953 }
3954
3955 let KafkaSinkConfigOptionExtracted {
3956 topic,
3957 compression_type,
3958 partition_by,
3959 progress_group_id_prefix,
3960 transactional_id_prefix,
3961 legacy_ids,
3962 topic_config,
3963 topic_metadata_refresh_interval,
3964 topic_partition_count,
3965 topic_replication_factor,
3966 seen: _,
3967 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3968
3969 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3970 (Some(_), Some(true)) => {
3971 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3972 }
3973 (None, Some(true)) => KafkaIdStyle::Legacy,
3974 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3975 };
3976
3977 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3978 (Some(_), Some(true)) => {
3979 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3980 }
3981 (None, Some(true)) => KafkaIdStyle::Legacy,
3982 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3983 };
3984
3985 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3986
3987 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3988 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3991 }
3992
3993 let assert_positive = |val: Option<i32>, name: &str| {
3994 if let Some(val) = val {
3995 if val <= 0 {
3996 sql_bail!("{} must be a positive integer", name);
3997 }
3998 }
3999 val.map(NonNeg::try_from)
4000 .transpose()
4001 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
4002 };
4003 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
4004 let topic_replication_factor =
4005 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
4006
4007 let gen_avro_schema_options = |conn| {
4010 let CsrConnectionAvro {
4011 connection:
4012 CsrConnection {
4013 connection,
4014 options,
4015 },
4016 seed,
4017 key_strategy,
4018 value_strategy,
4019 } = conn;
4020 if seed.is_some() {
4021 sql_bail!("SEED option does not make sense with sinks");
4022 }
4023 if key_strategy.is_some() {
4024 sql_bail!("KEY STRATEGY option does not make sense with sinks");
4025 }
4026 if value_strategy.is_some() {
4027 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
4028 }
4029
4030 let item = scx.get_item_by_resolved_name(&connection)?;
4031 let csr_connection = match item.connection()? {
4032 Connection::Csr(_) => item.id(),
4033 _ => {
4034 sql_bail!(
4035 "{} is not a schema registry connection",
4036 scx.catalog
4037 .resolve_full_name(item.name())
4038 .to_string()
4039 .quoted()
4040 )
4041 }
4042 };
4043 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4044
4045 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4046 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4047 }
4048
4049 if key_desc_and_indices.is_some()
4050 && (extracted_options.avro_key_fullname.is_some()
4051 ^ extracted_options.avro_value_fullname.is_some())
4052 {
4053 sql_bail!(
4054 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4055 );
4056 }
4057
4058 Ok((csr_connection, extracted_options))
4059 };
4060
4061 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4062 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4063 Format::Bytes if desc.arity() == 1 => {
4064 let col_type = &desc.typ().column_types[0].scalar_type;
4065 if !mz_pgrepr::Value::can_encode_binary(col_type) {
4066 bail_unsupported!(format!(
4067 "BYTES format with non-encodable type: {:?}",
4068 col_type
4069 ));
4070 }
4071
4072 Ok(KafkaSinkFormatType::Bytes)
4073 }
4074 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4075 Format::Bytes | Format::Text => {
4076 bail_unsupported!("BYTES or TEXT format with multiple columns")
4077 }
4078 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4079 Format::Avro(AvroSchema::Csr { csr_connection }) => {
4080 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4081 let schema = if is_key {
4082 AvroSchemaGenerator::new(
4083 desc.clone(),
4084 false,
4085 options.key_doc_options,
4086 options.avro_key_fullname.as_deref().unwrap_or("row"),
4087 options.null_defaults,
4088 Some(sink_from),
4089 false,
4090 )?
4091 .schema()
4092 .to_string()
4093 } else {
4094 AvroSchemaGenerator::new(
4095 desc.clone(),
4096 matches!(envelope, SinkEnvelope::Debezium),
4097 options.value_doc_options,
4098 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4099 options.null_defaults,
4100 Some(sink_from),
4101 true,
4102 )?
4103 .schema()
4104 .to_string()
4105 };
4106 Ok(KafkaSinkFormatType::Avro {
4107 schema,
4108 compatibility_level: if is_key {
4109 options.key_compatibility_level
4110 } else {
4111 options.value_compatibility_level
4112 },
4113 csr_connection,
4114 })
4115 }
4116 format => bail_unsupported!(format!("sink format {:?}", format)),
4117 };
4118
4119 let partition_by = match &partition_by {
4120 Some(partition_by) => {
4121 let mut scope = Scope::from_source(None, value_desc.iter_names());
4122
4123 match envelope {
4124 SinkEnvelope::Upsert => (),
4125 SinkEnvelope::Debezium => {
4126 let key_indices: HashSet<_> = key_desc_and_indices
4127 .as_ref()
4128 .map(|(_desc, indices)| indices.as_slice())
4129 .unwrap_or_default()
4130 .into_iter()
4131 .collect();
4132 for (i, item) in scope.items.iter_mut().enumerate() {
4133 if !key_indices.contains(&i) {
4134 item.error_if_referenced = Some(|_table, column| {
4135 PlanError::InvalidPartitionByEnvelopeDebezium {
4136 column_name: column.to_string(),
4137 }
4138 });
4139 }
4140 }
4141 }
4142 };
4143
4144 let ecx = &ExprContext {
4145 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4146 name: "PARTITION BY",
4147 scope: &scope,
4148 relation_type: value_desc.typ(),
4149 allow_aggregates: false,
4150 allow_subqueries: false,
4151 allow_parameters: false,
4152 allow_windows: false,
4153 };
4154 let expr = plan_expr(ecx, partition_by)?.cast_to(
4155 ecx,
4156 CastContext::Assignment,
4157 &SqlScalarType::UInt64,
4158 )?;
4159 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4160
4161 Some(expr)
4162 }
4163 _ => None,
4164 };
4165
4166 let format = match format {
4168 Some(FormatSpecifier::KeyValue { key, value }) => {
4169 let key_format = match key_desc_and_indices.as_ref() {
4170 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4171 None => None,
4172 };
4173 KafkaSinkFormat {
4174 value_format: map_format(value, &value_desc, false)?,
4175 key_format,
4176 }
4177 }
4178 Some(FormatSpecifier::Bare(format)) => {
4179 let key_format = match key_desc_and_indices.as_ref() {
4180 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4181 None => None,
4182 };
4183 KafkaSinkFormat {
4184 value_format: map_format(format, &value_desc, false)?,
4185 key_format,
4186 }
4187 }
4188 None => bail_unsupported!("sink without format"),
4189 };
4190
4191 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4192 connection_id,
4193 connection: connection_id,
4194 format,
4195 topic: topic_name,
4196 relation_key_indices,
4197 key_desc_and_indices,
4198 headers_index,
4199 value_desc,
4200 partition_by,
4201 compression_type,
4202 progress_group_id,
4203 transactional_id,
4204 topic_options: KafkaTopicOptions {
4205 partition_count: topic_partition_count,
4206 replication_factor: topic_replication_factor,
4207 topic_config: topic_config.unwrap_or_default(),
4208 },
4209 topic_metadata_refresh_interval,
4210 }))
4211}
4212
4213pub fn describe_create_index(
4214 _: &StatementContext,
4215 _: CreateIndexStatement<Aug>,
4216) -> Result<StatementDesc, PlanError> {
4217 Ok(StatementDesc::new(None))
4218}
4219
4220pub fn plan_create_index(
4221 scx: &StatementContext,
4222 mut stmt: CreateIndexStatement<Aug>,
4223) -> Result<Plan, PlanError> {
4224 let CreateIndexStatement {
4225 name,
4226 on_name,
4227 in_cluster,
4228 key_parts,
4229 with_options,
4230 if_not_exists,
4231 } = &mut stmt;
4232 let on = scx.get_item_by_resolved_name(on_name)?;
4233
4234 {
4235 use CatalogItemType::*;
4236 match on.item_type() {
4237 Table | Source | View | MaterializedView | ContinualTask => {
4238 if on.replacement_target().is_some() {
4239 sql_bail!(
4240 "index cannot be created on {} because it is a replacement {}",
4241 on_name.full_name_str(),
4242 on.item_type(),
4243 );
4244 }
4245 }
4246 Sink | Index | Type | Func | Secret | Connection => {
4247 sql_bail!(
4248 "index cannot be created on {} because it is a {}",
4249 on_name.full_name_str(),
4250 on.item_type(),
4251 );
4252 }
4253 }
4254 }
4255
4256 let on_desc = on.relation_desc().expect("item type checked above");
4257
4258 let filled_key_parts = match key_parts {
4259 Some(kp) => kp.to_vec(),
4260 None => {
4261 let key = on_desc.typ().default_key();
4263 key.iter()
4264 .map(|i| match on_desc.get_unambiguous_name(*i) {
4265 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4266 _ => Expr::Value(Value::Number((i + 1).to_string())),
4267 })
4268 .collect()
4269 }
4270 };
4271 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4272
4273 let index_name = if let Some(name) = name {
4274 QualifiedItemName {
4275 qualifiers: on.name().qualifiers.clone(),
4276 item: normalize::ident(name.clone()),
4277 }
4278 } else {
4279 let mut idx_name = QualifiedItemName {
4280 qualifiers: on.name().qualifiers.clone(),
4281 item: on.name().item.clone(),
4282 };
4283 if key_parts.is_none() {
4284 idx_name.item += "_primary_idx";
4286 } else {
4287 let index_name_col_suffix = keys
4290 .iter()
4291 .map(|k| match k {
4292 mz_expr::MirScalarExpr::Column(i, name) => {
4293 match (on_desc.get_unambiguous_name(*i), &name.0) {
4294 (Some(col_name), _) => col_name.to_string(),
4295 (None, Some(name)) => name.to_string(),
4296 (None, None) => format!("{}", i + 1),
4297 }
4298 }
4299 _ => "expr".to_string(),
4300 })
4301 .join("_");
4302 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4303 .expect("write on strings cannot fail");
4304 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4305 }
4306
4307 if !*if_not_exists {
4308 scx.catalog.find_available_name(idx_name)
4309 } else {
4310 idx_name
4311 }
4312 };
4313
4314 let full_name = scx.catalog.resolve_full_name(&index_name);
4316 let partial_name = PartialItemName::from(full_name.clone());
4317 if let (Ok(item), false, false) = (
4325 scx.catalog.resolve_item_or_type(&partial_name),
4326 *if_not_exists,
4327 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4328 ) {
4329 return Err(PlanError::ItemAlreadyExists {
4330 name: full_name.to_string(),
4331 item_type: item.item_type(),
4332 });
4333 }
4334
4335 let options = plan_index_options(scx, with_options.clone())?;
4336 let cluster_id = match in_cluster {
4337 None => scx.resolve_cluster(None)?.id(),
4338 Some(in_cluster) => in_cluster.id,
4339 };
4340
4341 *in_cluster = Some(ResolvedClusterName {
4342 id: cluster_id,
4343 print_name: None,
4344 });
4345
4346 *name = Some(Ident::new(index_name.item.clone())?);
4348 *key_parts = Some(filled_key_parts);
4349 let if_not_exists = *if_not_exists;
4350
4351 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4352 let compaction_window = options.iter().find_map(|o| {
4353 #[allow(irrefutable_let_patterns)]
4354 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4355 Some(lcw.clone())
4356 } else {
4357 None
4358 }
4359 });
4360
4361 Ok(Plan::CreateIndex(CreateIndexPlan {
4362 name: index_name,
4363 index: Index {
4364 create_sql,
4365 on: on.global_id(),
4366 keys,
4367 cluster_id,
4368 compaction_window,
4369 },
4370 if_not_exists,
4371 }))
4372}
4373
4374pub fn describe_create_type(
4375 _: &StatementContext,
4376 _: CreateTypeStatement<Aug>,
4377) -> Result<StatementDesc, PlanError> {
4378 Ok(StatementDesc::new(None))
4379}
4380
4381pub fn plan_create_type(
4382 scx: &StatementContext,
4383 stmt: CreateTypeStatement<Aug>,
4384) -> Result<Plan, PlanError> {
4385 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4386 let CreateTypeStatement { name, as_type, .. } = stmt;
4387
4388 fn validate_data_type(
4389 scx: &StatementContext,
4390 data_type: ResolvedDataType,
4391 as_type: &str,
4392 key: &str,
4393 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4394 let (id, modifiers) = match data_type {
4395 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4396 _ => sql_bail!(
4397 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4398 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4399 as_type,
4400 key,
4401 data_type.human_readable_name(),
4402 ),
4403 };
4404
4405 let item = scx.catalog.get_item(&id);
4406 match item.type_details() {
4407 None => sql_bail!(
4408 "{} must be of class type, but received {} which is of class {}",
4409 key,
4410 scx.catalog.resolve_full_name(item.name()),
4411 item.item_type()
4412 ),
4413 Some(CatalogTypeDetails {
4414 typ: CatalogType::Char,
4415 ..
4416 }) => {
4417 bail_unsupported!("embedding char type in a list or map")
4418 }
4419 _ => {
4420 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4422
4423 Ok((id, modifiers))
4424 }
4425 }
4426 }
4427
4428 let inner = match as_type {
4429 CreateTypeAs::List { options } => {
4430 let CreateTypeListOptionExtracted {
4431 element_type,
4432 seen: _,
4433 } = CreateTypeListOptionExtracted::try_from(options)?;
4434 let element_type =
4435 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4436 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4437 CatalogType::List {
4438 element_reference: id,
4439 element_modifiers: modifiers,
4440 }
4441 }
4442 CreateTypeAs::Map { options } => {
4443 let CreateTypeMapOptionExtracted {
4444 key_type,
4445 value_type,
4446 seen: _,
4447 } = CreateTypeMapOptionExtracted::try_from(options)?;
4448 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4449 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4450 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4451 let (value_id, value_modifiers) =
4452 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4453 CatalogType::Map {
4454 key_reference: key_id,
4455 key_modifiers,
4456 value_reference: value_id,
4457 value_modifiers,
4458 }
4459 }
4460 CreateTypeAs::Record { column_defs } => {
4461 let mut fields = vec![];
4462 for column_def in column_defs {
4463 let data_type = column_def.data_type;
4464 let key = ident(column_def.name.clone());
4465 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4466 fields.push(CatalogRecordField {
4467 name: ColumnName::from(key.clone()),
4468 type_reference: id,
4469 type_modifiers: modifiers,
4470 });
4471 }
4472 CatalogType::Record { fields }
4473 }
4474 };
4475
4476 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4477
4478 let full_name = scx.catalog.resolve_full_name(&name);
4480 let partial_name = PartialItemName::from(full_name.clone());
4481 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4484 if item.item_type().conflicts_with_type() {
4485 return Err(PlanError::ItemAlreadyExists {
4486 name: full_name.to_string(),
4487 item_type: item.item_type(),
4488 });
4489 }
4490 }
4491
4492 Ok(Plan::CreateType(CreateTypePlan {
4493 name,
4494 typ: Type { create_sql, inner },
4495 }))
4496}
4497
4498generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4499
4500generate_extracted_config!(
4501 CreateTypeMapOption,
4502 (KeyType, ResolvedDataType),
4503 (ValueType, ResolvedDataType)
4504);
4505
4506#[derive(Debug)]
4507pub enum PlannedAlterRoleOption {
4508 Attributes(PlannedRoleAttributes),
4509 Variable(PlannedRoleVariable),
4510}
4511
4512#[derive(Debug, Clone)]
4513pub struct PlannedRoleAttributes {
4514 pub inherit: Option<bool>,
4515 pub password: Option<Password>,
4516 pub scram_iterations: Option<NonZeroU32>,
4517 pub nopassword: Option<bool>,
4521 pub superuser: Option<bool>,
4522 pub login: Option<bool>,
4523}
4524
4525fn plan_role_attributes(
4526 options: Vec<RoleAttribute>,
4527 scx: &StatementContext,
4528) -> Result<PlannedRoleAttributes, PlanError> {
4529 let mut planned_attributes = PlannedRoleAttributes {
4530 inherit: None,
4531 password: None,
4532 scram_iterations: None,
4533 superuser: None,
4534 login: None,
4535 nopassword: None,
4536 };
4537
4538 for option in options {
4539 match option {
4540 RoleAttribute::Inherit | RoleAttribute::NoInherit
4541 if planned_attributes.inherit.is_some() =>
4542 {
4543 sql_bail!("conflicting or redundant options");
4544 }
4545 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4546 bail_never_supported!(
4547 "CREATECLUSTER attribute",
4548 "sql/create-role/#details",
4549 "Use system privileges instead."
4550 );
4551 }
4552 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4553 bail_never_supported!(
4554 "CREATEDB attribute",
4555 "sql/create-role/#details",
4556 "Use system privileges instead."
4557 );
4558 }
4559 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4560 bail_never_supported!(
4561 "CREATEROLE attribute",
4562 "sql/create-role/#details",
4563 "Use system privileges instead."
4564 );
4565 }
4566 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4567 sql_bail!("conflicting or redundant options");
4568 }
4569
4570 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4571 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4572 RoleAttribute::Password(password) => {
4573 if let Some(password) = password {
4574 planned_attributes.password = Some(password.into());
4575 planned_attributes.scram_iterations =
4576 Some(scx.catalog.system_vars().scram_iterations())
4577 } else {
4578 planned_attributes.nopassword = Some(true);
4579 }
4580 }
4581 RoleAttribute::SuperUser => {
4582 if planned_attributes.superuser == Some(false) {
4583 sql_bail!("conflicting or redundant options");
4584 }
4585 planned_attributes.superuser = Some(true);
4586 }
4587 RoleAttribute::NoSuperUser => {
4588 if planned_attributes.superuser == Some(true) {
4589 sql_bail!("conflicting or redundant options");
4590 }
4591 planned_attributes.superuser = Some(false);
4592 }
4593 RoleAttribute::Login => {
4594 if planned_attributes.login == Some(false) {
4595 sql_bail!("conflicting or redundant options");
4596 }
4597 planned_attributes.login = Some(true);
4598 }
4599 RoleAttribute::NoLogin => {
4600 if planned_attributes.login == Some(true) {
4601 sql_bail!("conflicting or redundant options");
4602 }
4603 planned_attributes.login = Some(false);
4604 }
4605 }
4606 }
4607 if planned_attributes.inherit == Some(false) {
4608 bail_unsupported!("non inherit roles");
4609 }
4610
4611 Ok(planned_attributes)
4612}
4613
4614#[derive(Debug)]
4615pub enum PlannedRoleVariable {
4616 Set { name: String, value: VariableValue },
4617 Reset { name: String },
4618}
4619
4620impl PlannedRoleVariable {
4621 pub fn name(&self) -> &str {
4622 match self {
4623 PlannedRoleVariable::Set { name, .. } => name,
4624 PlannedRoleVariable::Reset { name } => name,
4625 }
4626 }
4627}
4628
4629fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4630 let plan = match variable {
4631 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4632 name: name.to_string(),
4633 value: scl::plan_set_variable_to(value)?,
4634 },
4635 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4636 name: name.to_string(),
4637 },
4638 };
4639 Ok(plan)
4640}
4641
4642pub fn describe_create_role(
4643 _: &StatementContext,
4644 _: CreateRoleStatement,
4645) -> Result<StatementDesc, PlanError> {
4646 Ok(StatementDesc::new(None))
4647}
4648
4649pub fn plan_create_role(
4650 scx: &StatementContext,
4651 CreateRoleStatement { name, options }: CreateRoleStatement,
4652) -> Result<Plan, PlanError> {
4653 let attributes = plan_role_attributes(options, scx)?;
4654 Ok(Plan::CreateRole(CreateRolePlan {
4655 name: normalize::ident(name),
4656 attributes: attributes.into(),
4657 }))
4658}
4659
4660pub fn plan_create_network_policy(
4661 ctx: &StatementContext,
4662 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4663) -> Result<Plan, PlanError> {
4664 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4665 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4666
4667 let Some(rule_defs) = policy_options.rules else {
4668 sql_bail!("RULES must be specified when creating network policies.");
4669 };
4670
4671 let mut rules = vec![];
4672 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4673 let NetworkPolicyRuleOptionExtracted {
4674 seen: _,
4675 direction,
4676 action,
4677 address,
4678 } = options.try_into()?;
4679 let (direction, action, address) = match (direction, action, address) {
4680 (Some(direction), Some(action), Some(address)) => (
4681 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4682 NetworkPolicyRuleAction::try_from(action.as_str())?,
4683 PolicyAddress::try_from(address.as_str())?,
4684 ),
4685 (_, _, _) => {
4686 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4687 }
4688 };
4689 rules.push(NetworkPolicyRule {
4690 name: normalize::ident(name),
4691 direction,
4692 action,
4693 address,
4694 });
4695 }
4696
4697 if rules.len()
4698 > ctx
4699 .catalog
4700 .system_vars()
4701 .max_rules_per_network_policy()
4702 .try_into()?
4703 {
4704 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4705 }
4706
4707 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4708 name: normalize::ident(name),
4709 rules,
4710 }))
4711}
4712
4713pub fn plan_alter_network_policy(
4714 ctx: &StatementContext,
4715 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4716) -> Result<Plan, PlanError> {
4717 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4718
4719 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4720 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4721
4722 let Some(rule_defs) = policy_options.rules else {
4723 sql_bail!("RULES must be specified when creating network policies.");
4724 };
4725
4726 let mut rules = vec![];
4727 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4728 let NetworkPolicyRuleOptionExtracted {
4729 seen: _,
4730 direction,
4731 action,
4732 address,
4733 } = options.try_into()?;
4734
4735 let (direction, action, address) = match (direction, action, address) {
4736 (Some(direction), Some(action), Some(address)) => (
4737 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4738 NetworkPolicyRuleAction::try_from(action.as_str())?,
4739 PolicyAddress::try_from(address.as_str())?,
4740 ),
4741 (_, _, _) => {
4742 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4743 }
4744 };
4745 rules.push(NetworkPolicyRule {
4746 name: normalize::ident(name),
4747 direction,
4748 action,
4749 address,
4750 });
4751 }
4752 if rules.len()
4753 > ctx
4754 .catalog
4755 .system_vars()
4756 .max_rules_per_network_policy()
4757 .try_into()?
4758 {
4759 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4760 }
4761
4762 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4763 id: policy.id(),
4764 name: normalize::ident(name),
4765 rules,
4766 }))
4767}
4768
4769pub fn describe_create_cluster(
4770 _: &StatementContext,
4771 _: CreateClusterStatement<Aug>,
4772) -> Result<StatementDesc, PlanError> {
4773 Ok(StatementDesc::new(None))
4774}
4775
4776generate_extracted_config!(
4782 ClusterOption,
4783 (AvailabilityZones, Vec<String>),
4784 (Disk, bool),
4785 (IntrospectionDebugging, bool),
4786 (IntrospectionInterval, OptionalDuration),
4787 (Managed, bool),
4788 (Replicas, Vec<ReplicaDefinition<Aug>>),
4789 (ReplicationFactor, u32),
4790 (Size, String),
4791 (Schedule, ClusterScheduleOptionValue),
4792 (WorkloadClass, OptionalString)
4793);
4794
4795generate_extracted_config!(
4796 NetworkPolicyOption,
4797 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4798);
4799
4800generate_extracted_config!(
4801 NetworkPolicyRuleOption,
4802 (Direction, String),
4803 (Action, String),
4804 (Address, String)
4805);
4806
4807generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4808
4809generate_extracted_config!(
4810 ClusterAlterUntilReadyOption,
4811 (Timeout, Duration),
4812 (OnTimeout, String)
4813);
4814
4815generate_extracted_config!(
4816 ClusterFeature,
4817 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4818 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4819 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4820 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4821 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4822 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4823 (
4824 EnableProjectionPushdownAfterRelationCse,
4825 Option<bool>,
4826 Default(None)
4827 )
4828);
4829
4830pub fn plan_create_cluster(
4834 scx: &StatementContext,
4835 stmt: CreateClusterStatement<Aug>,
4836) -> Result<Plan, PlanError> {
4837 let plan = plan_create_cluster_inner(scx, stmt)?;
4838
4839 if let CreateClusterVariant::Managed(_) = &plan.variant {
4841 let stmt = unplan_create_cluster(scx, plan.clone())
4842 .map_err(|e| PlanError::Replan(e.to_string()))?;
4843 let create_sql = stmt.to_ast_string_stable();
4844 let stmt = parse::parse(&create_sql)
4845 .map_err(|e| PlanError::Replan(e.to_string()))?
4846 .into_element()
4847 .ast;
4848 let (stmt, _resolved_ids) =
4849 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4850 let stmt = match stmt {
4851 Statement::CreateCluster(stmt) => stmt,
4852 stmt => {
4853 return Err(PlanError::Replan(format!(
4854 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4855 )));
4856 }
4857 };
4858 let replan =
4859 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4860 if plan != replan {
4861 return Err(PlanError::Replan(format!(
4862 "replan does not match: plan={plan:?}, replan={replan:?}"
4863 )));
4864 }
4865 }
4866
4867 Ok(Plan::CreateCluster(plan))
4868}
4869
4870pub fn plan_create_cluster_inner(
4871 scx: &StatementContext,
4872 CreateClusterStatement {
4873 name,
4874 options,
4875 features,
4876 }: CreateClusterStatement<Aug>,
4877) -> Result<CreateClusterPlan, PlanError> {
4878 let ClusterOptionExtracted {
4879 availability_zones,
4880 introspection_debugging,
4881 introspection_interval,
4882 managed,
4883 replicas,
4884 replication_factor,
4885 seen: _,
4886 size,
4887 disk,
4888 schedule,
4889 workload_class,
4890 }: ClusterOptionExtracted = options.try_into()?;
4891
4892 let managed = managed.unwrap_or_else(|| replicas.is_none());
4893
4894 if !scx.catalog.active_role_id().is_system() {
4895 if !features.is_empty() {
4896 sql_bail!("FEATURES not supported for non-system users");
4897 }
4898 if workload_class.is_some() {
4899 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4900 }
4901 }
4902
4903 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4904 let workload_class = workload_class.and_then(|v| v.0);
4905
4906 if managed {
4907 if replicas.is_some() {
4908 sql_bail!("REPLICAS not supported for managed clusters");
4909 }
4910 let Some(size) = size else {
4911 sql_bail!("SIZE must be specified for managed clusters");
4912 };
4913
4914 if disk.is_some() {
4915 if scx.catalog.is_cluster_size_cc(&size) {
4919 sql_bail!(
4920 "DISK option not supported for modern cluster sizes because disk is always enabled"
4921 );
4922 }
4923
4924 scx.catalog
4925 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4926 }
4927
4928 let compute = plan_compute_replica_config(
4929 introspection_interval,
4930 introspection_debugging.unwrap_or(false),
4931 )?;
4932
4933 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4934 replication_factor.unwrap_or_else(|| {
4935 scx.catalog
4936 .system_vars()
4937 .default_cluster_replication_factor()
4938 })
4939 } else {
4940 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4941 if replication_factor.is_some() {
4942 sql_bail!(
4943 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4944 );
4945 }
4946 0
4950 };
4951 let availability_zones = availability_zones.unwrap_or_default();
4952
4953 if !availability_zones.is_empty() {
4954 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4955 }
4956
4957 let ClusterFeatureExtracted {
4959 reoptimize_imported_views,
4960 enable_eager_delta_joins,
4961 enable_new_outer_join_lowering,
4962 enable_variadic_left_join_lowering,
4963 enable_letrec_fixpoint_analysis,
4964 enable_join_prioritize_arranged,
4965 enable_projection_pushdown_after_relation_cse,
4966 seen: _,
4967 } = ClusterFeatureExtracted::try_from(features)?;
4968 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4969 reoptimize_imported_views,
4970 enable_eager_delta_joins,
4971 enable_new_outer_join_lowering,
4972 enable_variadic_left_join_lowering,
4973 enable_letrec_fixpoint_analysis,
4974 enable_join_prioritize_arranged,
4975 enable_projection_pushdown_after_relation_cse,
4976 ..Default::default()
4977 };
4978
4979 let schedule = plan_cluster_schedule(schedule)?;
4980
4981 Ok(CreateClusterPlan {
4982 name: normalize::ident(name),
4983 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4984 replication_factor,
4985 size,
4986 availability_zones,
4987 compute,
4988 optimizer_feature_overrides,
4989 schedule,
4990 }),
4991 workload_class,
4992 })
4993 } else {
4994 let Some(replica_defs) = replicas else {
4995 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4996 };
4997 if availability_zones.is_some() {
4998 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4999 }
5000 if replication_factor.is_some() {
5001 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
5002 }
5003 if introspection_debugging.is_some() {
5004 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
5005 }
5006 if introspection_interval.is_some() {
5007 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
5008 }
5009 if size.is_some() {
5010 sql_bail!("SIZE not supported for unmanaged clusters");
5011 }
5012 if disk.is_some() {
5013 sql_bail!("DISK not supported for unmanaged clusters");
5014 }
5015 if !features.is_empty() {
5016 sql_bail!("FEATURES not supported for unmanaged clusters");
5017 }
5018 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
5019 sql_bail!(
5020 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
5021 );
5022 }
5023
5024 let mut replicas = vec![];
5025 for ReplicaDefinition { name, options } in replica_defs {
5026 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
5027 }
5028
5029 Ok(CreateClusterPlan {
5030 name: normalize::ident(name),
5031 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
5032 workload_class,
5033 })
5034 }
5035}
5036
5037pub fn unplan_create_cluster(
5041 scx: &StatementContext,
5042 CreateClusterPlan {
5043 name,
5044 variant,
5045 workload_class,
5046 }: CreateClusterPlan,
5047) -> Result<CreateClusterStatement<Aug>, PlanError> {
5048 match variant {
5049 CreateClusterVariant::Managed(CreateClusterManagedPlan {
5050 replication_factor,
5051 size,
5052 availability_zones,
5053 compute,
5054 optimizer_feature_overrides,
5055 schedule,
5056 }) => {
5057 let schedule = unplan_cluster_schedule(schedule);
5058 let OptimizerFeatureOverrides {
5059 enable_guard_subquery_tablefunc: _,
5060 enable_consolidate_after_union_negate: _,
5061 enable_reduce_mfp_fusion: _,
5062 enable_cardinality_estimates: _,
5063 persist_fast_path_limit: _,
5064 reoptimize_imported_views,
5065 enable_eager_delta_joins,
5066 enable_new_outer_join_lowering,
5067 enable_variadic_left_join_lowering,
5068 enable_letrec_fixpoint_analysis,
5069 enable_join_prioritize_arranged,
5070 enable_projection_pushdown_after_relation_cse,
5071 enable_less_reduce_in_eqprop: _,
5072 enable_dequadratic_eqprop_map: _,
5073 enable_eq_classes_withholding_errors: _,
5074 enable_fast_path_plan_insights: _,
5075 enable_cast_elimination: _,
5076 } = optimizer_feature_overrides;
5077 let features_extracted = ClusterFeatureExtracted {
5079 seen: Default::default(),
5081 reoptimize_imported_views,
5082 enable_eager_delta_joins,
5083 enable_new_outer_join_lowering,
5084 enable_variadic_left_join_lowering,
5085 enable_letrec_fixpoint_analysis,
5086 enable_join_prioritize_arranged,
5087 enable_projection_pushdown_after_relation_cse,
5088 };
5089 let features = features_extracted.into_values(scx.catalog);
5090 let availability_zones = if availability_zones.is_empty() {
5091 None
5092 } else {
5093 Some(availability_zones)
5094 };
5095 let (introspection_interval, introspection_debugging) =
5096 unplan_compute_replica_config(compute);
5097 let replication_factor = match &schedule {
5100 ClusterScheduleOptionValue::Manual => Some(replication_factor),
5101 ClusterScheduleOptionValue::Refresh { .. } => {
5102 assert!(
5103 replication_factor <= 1,
5104 "replication factor, {replication_factor:?}, must be <= 1"
5105 );
5106 None
5107 }
5108 };
5109 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5110 let options_extracted = ClusterOptionExtracted {
5111 seen: Default::default(),
5113 availability_zones,
5114 disk: None,
5115 introspection_debugging: Some(introspection_debugging),
5116 introspection_interval,
5117 managed: Some(true),
5118 replicas: None,
5119 replication_factor,
5120 size: Some(size),
5121 schedule: Some(schedule),
5122 workload_class,
5123 };
5124 let options = options_extracted.into_values(scx.catalog);
5125 let name = Ident::new_unchecked(name);
5126 Ok(CreateClusterStatement {
5127 name,
5128 options,
5129 features,
5130 })
5131 }
5132 CreateClusterVariant::Unmanaged(_) => {
5133 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5134 }
5135 }
5136}
5137
5138generate_extracted_config!(
5139 ReplicaOption,
5140 (AvailabilityZone, String),
5141 (BilledAs, String),
5142 (ComputeAddresses, Vec<String>),
5143 (ComputectlAddresses, Vec<String>),
5144 (Disk, bool),
5145 (Internal, bool, Default(false)),
5146 (IntrospectionDebugging, bool, Default(false)),
5147 (IntrospectionInterval, OptionalDuration),
5148 (Size, String),
5149 (StorageAddresses, Vec<String>),
5150 (StoragectlAddresses, Vec<String>),
5151 (Workers, u16)
5152);
5153
5154fn plan_replica_config(
5155 scx: &StatementContext,
5156 options: Vec<ReplicaOption<Aug>>,
5157) -> Result<ReplicaConfig, PlanError> {
5158 let ReplicaOptionExtracted {
5159 availability_zone,
5160 billed_as,
5161 computectl_addresses,
5162 disk,
5163 internal,
5164 introspection_debugging,
5165 introspection_interval,
5166 size,
5167 storagectl_addresses,
5168 ..
5169 }: ReplicaOptionExtracted = options.try_into()?;
5170
5171 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5172
5173 match (
5174 size,
5175 availability_zone,
5176 billed_as,
5177 storagectl_addresses,
5178 computectl_addresses,
5179 ) {
5180 (None, _, None, None, None) => {
5182 sql_bail!("SIZE option must be specified");
5185 }
5186 (Some(size), availability_zone, billed_as, None, None) => {
5187 if disk.is_some() {
5188 if scx.catalog.is_cluster_size_cc(&size) {
5192 sql_bail!(
5193 "DISK option not supported for modern cluster sizes because disk is always enabled"
5194 );
5195 }
5196
5197 scx.catalog
5198 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5199 }
5200
5201 Ok(ReplicaConfig::Orchestrated {
5202 size,
5203 availability_zone,
5204 compute,
5205 billed_as,
5206 internal,
5207 })
5208 }
5209
5210 (None, None, None, storagectl_addresses, computectl_addresses) => {
5211 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5212
5213 let Some(storagectl_addrs) = storagectl_addresses else {
5217 sql_bail!("missing STORAGECTL ADDRESSES option");
5218 };
5219 let Some(computectl_addrs) = computectl_addresses else {
5220 sql_bail!("missing COMPUTECTL ADDRESSES option");
5221 };
5222
5223 if storagectl_addrs.len() != computectl_addrs.len() {
5224 sql_bail!(
5225 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5226 );
5227 }
5228
5229 if disk.is_some() {
5230 sql_bail!("DISK can't be specified for unorchestrated clusters");
5231 }
5232
5233 Ok(ReplicaConfig::Unorchestrated {
5234 storagectl_addrs,
5235 computectl_addrs,
5236 compute,
5237 })
5238 }
5239 _ => {
5240 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5243 }
5244 }
5245}
5246
5247fn plan_compute_replica_config(
5251 introspection_interval: Option<OptionalDuration>,
5252 introspection_debugging: bool,
5253) -> Result<ComputeReplicaConfig, PlanError> {
5254 let introspection_interval = introspection_interval
5255 .map(|OptionalDuration(i)| i)
5256 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5257 let introspection = match introspection_interval {
5258 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5259 interval,
5260 debugging: introspection_debugging,
5261 }),
5262 None if introspection_debugging => {
5263 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5264 }
5265 None => None,
5266 };
5267 let compute = ComputeReplicaConfig { introspection };
5268 Ok(compute)
5269}
5270
5271fn unplan_compute_replica_config(
5275 compute_replica_config: ComputeReplicaConfig,
5276) -> (Option<OptionalDuration>, bool) {
5277 match compute_replica_config.introspection {
5278 Some(ComputeReplicaIntrospectionConfig {
5279 debugging,
5280 interval,
5281 }) => (Some(OptionalDuration(Some(interval))), debugging),
5282 None => (Some(OptionalDuration(None)), false),
5283 }
5284}
5285
5286fn plan_cluster_schedule(
5290 schedule: ClusterScheduleOptionValue,
5291) -> Result<ClusterSchedule, PlanError> {
5292 Ok(match schedule {
5293 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5294 ClusterScheduleOptionValue::Refresh {
5296 hydration_time_estimate: None,
5297 } => ClusterSchedule::Refresh {
5298 hydration_time_estimate: Duration::from_millis(0),
5299 },
5300 ClusterScheduleOptionValue::Refresh {
5302 hydration_time_estimate: Some(interval_value),
5303 } => {
5304 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5305 if interval.as_microseconds() < 0 {
5306 sql_bail!(
5307 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5308 interval
5309 );
5310 }
5311 if interval.months != 0 {
5312 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5316 }
5317 let duration = interval.duration()?;
5318 if u64::try_from(duration.as_millis()).is_err()
5319 || Interval::from_duration(&duration).is_err()
5320 {
5321 sql_bail!("HYDRATION TIME ESTIMATE too large");
5322 }
5323 ClusterSchedule::Refresh {
5324 hydration_time_estimate: duration,
5325 }
5326 }
5327 })
5328}
5329
5330fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5334 match schedule {
5335 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5336 ClusterSchedule::Refresh {
5337 hydration_time_estimate,
5338 } => {
5339 let interval = Interval::from_duration(&hydration_time_estimate)
5340 .expect("planning ensured that this is convertible back to Interval");
5341 let interval_value = literal::unplan_interval(&interval);
5342 ClusterScheduleOptionValue::Refresh {
5343 hydration_time_estimate: Some(interval_value),
5344 }
5345 }
5346 }
5347}
5348
5349pub fn describe_create_cluster_replica(
5350 _: &StatementContext,
5351 _: CreateClusterReplicaStatement<Aug>,
5352) -> Result<StatementDesc, PlanError> {
5353 Ok(StatementDesc::new(None))
5354}
5355
5356pub fn plan_create_cluster_replica(
5357 scx: &StatementContext,
5358 CreateClusterReplicaStatement {
5359 definition: ReplicaDefinition { name, options },
5360 of_cluster,
5361 }: CreateClusterReplicaStatement<Aug>,
5362) -> Result<Plan, PlanError> {
5363 let cluster = scx
5364 .catalog
5365 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5366 let current_replica_count = cluster.replica_ids().iter().count();
5367 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5368 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5369 return Err(PlanError::CreateReplicaFailStorageObjects {
5370 current_replica_count,
5371 internal_replica_count,
5372 hypothetical_replica_count: current_replica_count + 1,
5373 });
5374 }
5375
5376 let config = plan_replica_config(scx, options)?;
5377
5378 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5379 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5380 return Err(PlanError::MangedReplicaName(name.into_string()));
5381 }
5382 } else {
5383 ensure_cluster_is_not_managed(scx, cluster.id())?;
5384 }
5385
5386 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5387 name: normalize::ident(name),
5388 cluster_id: cluster.id(),
5389 config,
5390 }))
5391}
5392
5393pub fn describe_create_secret(
5394 _: &StatementContext,
5395 _: CreateSecretStatement<Aug>,
5396) -> Result<StatementDesc, PlanError> {
5397 Ok(StatementDesc::new(None))
5398}
5399
5400pub fn plan_create_secret(
5401 scx: &StatementContext,
5402 stmt: CreateSecretStatement<Aug>,
5403) -> Result<Plan, PlanError> {
5404 let CreateSecretStatement {
5405 name,
5406 if_not_exists,
5407 value,
5408 } = &stmt;
5409
5410 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5411 let mut create_sql_statement = stmt.clone();
5412 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5413 let create_sql =
5414 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5415 let secret_as = query::plan_secret_as(scx, value.clone())?;
5416
5417 let secret = Secret {
5418 create_sql,
5419 secret_as,
5420 };
5421
5422 Ok(Plan::CreateSecret(CreateSecretPlan {
5423 name,
5424 secret,
5425 if_not_exists: *if_not_exists,
5426 }))
5427}
5428
5429pub fn describe_create_connection(
5430 _: &StatementContext,
5431 _: CreateConnectionStatement<Aug>,
5432) -> Result<StatementDesc, PlanError> {
5433 Ok(StatementDesc::new(None))
5434}
5435
5436generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5437
5438pub fn plan_create_connection(
5439 scx: &StatementContext,
5440 mut stmt: CreateConnectionStatement<Aug>,
5441) -> Result<Plan, PlanError> {
5442 let CreateConnectionStatement {
5443 name,
5444 connection_type,
5445 values,
5446 if_not_exists,
5447 with_options,
5448 } = stmt.clone();
5449 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5450 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5451 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5452
5453 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5454 if options.validate.is_some() {
5455 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5456 }
5457 let validate = match options.validate {
5458 Some(val) => val,
5459 None => {
5460 scx.catalog
5461 .system_vars()
5462 .enable_default_connection_validation()
5463 && details.to_connection().validate_by_default()
5464 }
5465 };
5466
5467 let full_name = scx.catalog.resolve_full_name(&name);
5469 let partial_name = PartialItemName::from(full_name.clone());
5470 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5471 return Err(PlanError::ItemAlreadyExists {
5472 name: full_name.to_string(),
5473 item_type: item.item_type(),
5474 });
5475 }
5476
5477 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5480 stmt.values.retain(|v| {
5481 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5482 });
5483 stmt.values.push(ConnectionOption {
5484 name: ConnectionOptionName::PublicKey1,
5485 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5486 });
5487 stmt.values.push(ConnectionOption {
5488 name: ConnectionOptionName::PublicKey2,
5489 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5490 });
5491 }
5492 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5493
5494 let plan = CreateConnectionPlan {
5495 name,
5496 if_not_exists,
5497 connection: crate::plan::Connection {
5498 create_sql,
5499 details,
5500 },
5501 validate,
5502 };
5503 Ok(Plan::CreateConnection(plan))
5504}
5505
5506fn plan_drop_database(
5507 scx: &StatementContext,
5508 if_exists: bool,
5509 name: &UnresolvedDatabaseName,
5510 cascade: bool,
5511) -> Result<Option<DatabaseId>, PlanError> {
5512 Ok(match resolve_database(scx, name, if_exists)? {
5513 Some(database) => {
5514 if !cascade && database.has_schemas() {
5515 sql_bail!(
5516 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5517 name,
5518 );
5519 }
5520 Some(database.id())
5521 }
5522 None => None,
5523 })
5524}
5525
5526pub fn describe_drop_objects(
5527 _: &StatementContext,
5528 _: DropObjectsStatement,
5529) -> Result<StatementDesc, PlanError> {
5530 Ok(StatementDesc::new(None))
5531}
5532
5533pub fn plan_drop_objects(
5534 scx: &mut StatementContext,
5535 DropObjectsStatement {
5536 object_type,
5537 if_exists,
5538 names,
5539 cascade,
5540 }: DropObjectsStatement,
5541) -> Result<Plan, PlanError> {
5542 assert_ne!(
5543 object_type,
5544 mz_sql_parser::ast::ObjectType::Func,
5545 "rejected in parser"
5546 );
5547 let object_type = object_type.into();
5548
5549 let mut referenced_ids = Vec::new();
5550 for name in names {
5551 let id = match &name {
5552 UnresolvedObjectName::Cluster(name) => {
5553 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5554 }
5555 UnresolvedObjectName::ClusterReplica(name) => {
5556 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5557 }
5558 UnresolvedObjectName::Database(name) => {
5559 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5560 }
5561 UnresolvedObjectName::Schema(name) => {
5562 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5563 }
5564 UnresolvedObjectName::Role(name) => {
5565 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5566 }
5567 UnresolvedObjectName::Item(name) => {
5568 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5569 .map(ObjectId::Item)
5570 }
5571 UnresolvedObjectName::NetworkPolicy(name) => {
5572 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5573 }
5574 };
5575 match id {
5576 Some(id) => referenced_ids.push(id),
5577 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5578 name: name.to_ast_string_simple(),
5579 object_type,
5580 }),
5581 }
5582 }
5583 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5584
5585 Ok(Plan::DropObjects(DropObjectsPlan {
5586 referenced_ids,
5587 drop_ids,
5588 object_type,
5589 }))
5590}
5591
5592fn plan_drop_schema(
5593 scx: &StatementContext,
5594 if_exists: bool,
5595 name: &UnresolvedSchemaName,
5596 cascade: bool,
5597) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5598 let normalized = normalize::unresolved_schema_name(name.clone())?;
5602 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5603 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5604 }
5605
5606 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5607 Some((database_spec, schema_spec)) => {
5608 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5609 sql_bail!(
5610 "cannot drop schema {name} because it is required by the database system",
5611 );
5612 }
5613 if let SchemaSpecifier::Temporary = schema_spec {
5614 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5615 }
5616 let schema = scx.get_schema(&database_spec, &schema_spec);
5617 if !cascade && schema.has_items() {
5618 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5619 sql_bail!(
5620 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5621 full_schema_name
5622 );
5623 }
5624 Some((database_spec, schema_spec))
5625 }
5626 None => None,
5627 })
5628}
5629
5630fn plan_drop_role(
5631 scx: &StatementContext,
5632 if_exists: bool,
5633 name: &Ident,
5634) -> Result<Option<RoleId>, PlanError> {
5635 match scx.catalog.resolve_role(name.as_str()) {
5636 Ok(role) => {
5637 let id = role.id();
5638 if &id == scx.catalog.active_role_id() {
5639 sql_bail!("current role cannot be dropped");
5640 }
5641 for role in scx.catalog.get_roles() {
5642 for (member_id, grantor_id) in role.membership() {
5643 if &id == grantor_id {
5644 let member_role = scx.catalog.get_role(member_id);
5645 sql_bail!(
5646 "cannot drop role {}: still depended up by membership of role {} in role {}",
5647 name.as_str(),
5648 role.name(),
5649 member_role.name()
5650 );
5651 }
5652 }
5653 }
5654 Ok(Some(role.id()))
5655 }
5656 Err(_) if if_exists => Ok(None),
5657 Err(e) => Err(e.into()),
5658 }
5659}
5660
5661fn plan_drop_cluster(
5662 scx: &StatementContext,
5663 if_exists: bool,
5664 name: &Ident,
5665 cascade: bool,
5666) -> Result<Option<ClusterId>, PlanError> {
5667 Ok(match resolve_cluster(scx, name, if_exists)? {
5668 Some(cluster) => {
5669 if !cascade && !cluster.bound_objects().is_empty() {
5670 return Err(PlanError::DependentObjectsStillExist {
5671 object_type: "cluster".to_string(),
5672 object_name: cluster.name().to_string(),
5673 dependents: Vec::new(),
5674 });
5675 }
5676 Some(cluster.id())
5677 }
5678 None => None,
5679 })
5680}
5681
5682fn plan_drop_network_policy(
5683 scx: &StatementContext,
5684 if_exists: bool,
5685 name: &Ident,
5686) -> Result<Option<NetworkPolicyId>, PlanError> {
5687 match scx.catalog.resolve_network_policy(name.as_str()) {
5688 Ok(policy) => {
5689 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5692 Err(PlanError::NetworkPolicyInUse)
5693 } else {
5694 Ok(Some(policy.id()))
5695 }
5696 }
5697 Err(_) if if_exists => Ok(None),
5698 Err(e) => Err(e.into()),
5699 }
5700}
5701
5702fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5705 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5707 false
5708 } else {
5709 cluster.bound_objects().iter().any(|id| {
5711 let item = scx.catalog.get_item(id);
5712 matches!(
5713 item.item_type(),
5714 CatalogItemType::Sink | CatalogItemType::Source
5715 )
5716 })
5717 }
5718}
5719
5720fn plan_drop_cluster_replica(
5721 scx: &StatementContext,
5722 if_exists: bool,
5723 name: &QualifiedReplica,
5724) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5725 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5726 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5727}
5728
5729fn plan_drop_item(
5731 scx: &StatementContext,
5732 object_type: ObjectType,
5733 if_exists: bool,
5734 name: UnresolvedItemName,
5735 cascade: bool,
5736) -> Result<Option<CatalogItemId>, PlanError> {
5737 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5738 Ok(r) => r,
5739 Err(PlanError::MismatchedObjectType {
5741 name,
5742 is_type: ObjectType::MaterializedView,
5743 expected_type: ObjectType::View,
5744 }) => {
5745 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5746 }
5747 e => e?,
5748 };
5749
5750 Ok(match resolved {
5751 Some(catalog_item) => {
5752 if catalog_item.id().is_system() {
5753 sql_bail!(
5754 "cannot drop {} {} because it is required by the database system",
5755 catalog_item.item_type(),
5756 scx.catalog.minimal_qualification(catalog_item.name()),
5757 );
5758 }
5759
5760 if !cascade {
5761 for id in catalog_item.used_by() {
5762 let dep = scx.catalog.get_item(id);
5763 if dependency_prevents_drop(object_type, dep) {
5764 return Err(PlanError::DependentObjectsStillExist {
5765 object_type: catalog_item.item_type().to_string(),
5766 object_name: scx
5767 .catalog
5768 .minimal_qualification(catalog_item.name())
5769 .to_string(),
5770 dependents: vec![(
5771 dep.item_type().to_string(),
5772 scx.catalog.minimal_qualification(dep.name()).to_string(),
5773 )],
5774 });
5775 }
5776 }
5777 }
5780 Some(catalog_item.id())
5781 }
5782 None => None,
5783 })
5784}
5785
5786fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5788 match object_type {
5789 ObjectType::Type => true,
5790 _ => match dep.item_type() {
5791 CatalogItemType::Func
5792 | CatalogItemType::Table
5793 | CatalogItemType::Source
5794 | CatalogItemType::View
5795 | CatalogItemType::MaterializedView
5796 | CatalogItemType::Sink
5797 | CatalogItemType::Type
5798 | CatalogItemType::Secret
5799 | CatalogItemType::Connection
5800 | CatalogItemType::ContinualTask => true,
5801 CatalogItemType::Index => false,
5802 },
5803 }
5804}
5805
5806pub fn describe_alter_index_options(
5807 _: &StatementContext,
5808 _: AlterIndexStatement<Aug>,
5809) -> Result<StatementDesc, PlanError> {
5810 Ok(StatementDesc::new(None))
5811}
5812
5813pub fn describe_drop_owned(
5814 _: &StatementContext,
5815 _: DropOwnedStatement<Aug>,
5816) -> Result<StatementDesc, PlanError> {
5817 Ok(StatementDesc::new(None))
5818}
5819
5820pub fn plan_drop_owned(
5821 scx: &StatementContext,
5822 drop: DropOwnedStatement<Aug>,
5823) -> Result<Plan, PlanError> {
5824 let cascade = drop.cascade();
5825 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5826 let mut drop_ids = Vec::new();
5827 let mut privilege_revokes = Vec::new();
5828 let mut default_privilege_revokes = Vec::new();
5829
5830 fn update_privilege_revokes(
5831 object_id: SystemObjectId,
5832 privileges: &PrivilegeMap,
5833 role_ids: &BTreeSet<RoleId>,
5834 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5835 ) {
5836 privilege_revokes.extend(iter::zip(
5837 iter::repeat(object_id),
5838 privileges
5839 .all_values()
5840 .filter(|privilege| role_ids.contains(&privilege.grantee))
5841 .cloned(),
5842 ));
5843 }
5844
5845 for replica in scx.catalog.get_cluster_replicas() {
5847 if role_ids.contains(&replica.owner_id()) {
5848 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5849 }
5850 }
5851
5852 for cluster in scx.catalog.get_clusters() {
5854 if role_ids.contains(&cluster.owner_id()) {
5855 if !cascade {
5857 let non_owned_bound_objects: Vec<_> = cluster
5858 .bound_objects()
5859 .into_iter()
5860 .map(|item_id| scx.catalog.get_item(item_id))
5861 .filter(|item| !role_ids.contains(&item.owner_id()))
5862 .collect();
5863 if !non_owned_bound_objects.is_empty() {
5864 let names: Vec<_> = non_owned_bound_objects
5865 .into_iter()
5866 .map(|item| {
5867 (
5868 item.item_type().to_string(),
5869 scx.catalog.resolve_full_name(item.name()).to_string(),
5870 )
5871 })
5872 .collect();
5873 return Err(PlanError::DependentObjectsStillExist {
5874 object_type: "cluster".to_string(),
5875 object_name: cluster.name().to_string(),
5876 dependents: names,
5877 });
5878 }
5879 }
5880 drop_ids.push(cluster.id().into());
5881 }
5882 update_privilege_revokes(
5883 SystemObjectId::Object(cluster.id().into()),
5884 cluster.privileges(),
5885 &role_ids,
5886 &mut privilege_revokes,
5887 );
5888 }
5889
5890 for item in scx.catalog.get_items() {
5892 if role_ids.contains(&item.owner_id()) {
5893 if !cascade {
5894 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5896 let non_owned_dependencies: Vec<_> = used_by
5897 .into_iter()
5898 .map(|item_id| scx.catalog.get_item(item_id))
5899 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5900 .filter(|item| !role_ids.contains(&item.owner_id()))
5901 .collect();
5902 if !non_owned_dependencies.is_empty() {
5903 let names: Vec<_> = non_owned_dependencies
5904 .into_iter()
5905 .map(|item| {
5906 let item_typ = item.item_type().to_string();
5907 let item_name =
5908 scx.catalog.resolve_full_name(item.name()).to_string();
5909 (item_typ, item_name)
5910 })
5911 .collect();
5912 Err(PlanError::DependentObjectsStillExist {
5913 object_type: item.item_type().to_string(),
5914 object_name: scx
5915 .catalog
5916 .resolve_full_name(item.name())
5917 .to_string()
5918 .to_string(),
5919 dependents: names,
5920 })
5921 } else {
5922 Ok(())
5923 }
5924 };
5925
5926 if let Some(id) = item.progress_id() {
5929 let progress_item = scx.catalog.get_item(&id);
5930 check_if_dependents_exist(progress_item.used_by())?;
5931 }
5932 check_if_dependents_exist(item.used_by())?;
5933 }
5934 drop_ids.push(item.id().into());
5935 }
5936 update_privilege_revokes(
5937 SystemObjectId::Object(item.id().into()),
5938 item.privileges(),
5939 &role_ids,
5940 &mut privilege_revokes,
5941 );
5942 }
5943
5944 for schema in scx.catalog.get_schemas() {
5946 if !schema.id().is_temporary() {
5947 if role_ids.contains(&schema.owner_id()) {
5948 if !cascade {
5949 let non_owned_dependencies: Vec<_> = schema
5950 .item_ids()
5951 .map(|item_id| scx.catalog.get_item(&item_id))
5952 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5953 .filter(|item| !role_ids.contains(&item.owner_id()))
5954 .collect();
5955 if !non_owned_dependencies.is_empty() {
5956 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5957 sql_bail!(
5958 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5959 full_schema_name.to_string().quoted()
5960 );
5961 }
5962 }
5963 drop_ids.push((*schema.database(), *schema.id()).into())
5964 }
5965 update_privilege_revokes(
5966 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5967 schema.privileges(),
5968 &role_ids,
5969 &mut privilege_revokes,
5970 );
5971 }
5972 }
5973
5974 for database in scx.catalog.get_databases() {
5976 if role_ids.contains(&database.owner_id()) {
5977 if !cascade {
5978 let non_owned_schemas: Vec<_> = database
5979 .schemas()
5980 .into_iter()
5981 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5982 .collect();
5983 if !non_owned_schemas.is_empty() {
5984 sql_bail!(
5985 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5986 database.name().quoted(),
5987 );
5988 }
5989 }
5990 drop_ids.push(database.id().into());
5991 }
5992 update_privilege_revokes(
5993 SystemObjectId::Object(database.id().into()),
5994 database.privileges(),
5995 &role_ids,
5996 &mut privilege_revokes,
5997 );
5998 }
5999
6000 for network_policy in scx.catalog.get_network_policies() {
6002 if role_ids.contains(&network_policy.owner_id()) {
6003 drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
6004 }
6005 update_privilege_revokes(
6006 SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
6007 network_policy.privileges(),
6008 &role_ids,
6009 &mut privilege_revokes,
6010 );
6011 }
6012
6013 update_privilege_revokes(
6015 SystemObjectId::System,
6016 scx.catalog.get_system_privileges(),
6017 &role_ids,
6018 &mut privilege_revokes,
6019 );
6020
6021 for (default_privilege_object, default_privilege_acl_items) in
6022 scx.catalog.get_default_privileges()
6023 {
6024 for default_privilege_acl_item in default_privilege_acl_items {
6025 if role_ids.contains(&default_privilege_object.role_id)
6026 || role_ids.contains(&default_privilege_acl_item.grantee)
6027 {
6028 default_privilege_revokes.push((
6029 default_privilege_object.clone(),
6030 default_privilege_acl_item.clone(),
6031 ));
6032 }
6033 }
6034 }
6035
6036 let drop_ids = scx.catalog.object_dependents(&drop_ids);
6037
6038 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
6039 if !system_ids.is_empty() {
6040 let mut owners = system_ids
6041 .into_iter()
6042 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
6043 .collect::<BTreeSet<_>>()
6044 .into_iter()
6045 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
6046 sql_bail!(
6047 "cannot drop objects owned by role {} because they are required by the database system",
6048 owners.join(", "),
6049 );
6050 }
6051
6052 Ok(Plan::DropOwned(DropOwnedPlan {
6053 role_ids: role_ids.into_iter().collect(),
6054 drop_ids,
6055 privilege_revokes,
6056 default_privilege_revokes,
6057 }))
6058}
6059
6060fn plan_retain_history_option(
6061 scx: &StatementContext,
6062 retain_history: Option<OptionalDuration>,
6063) -> Result<Option<CompactionWindow>, PlanError> {
6064 if let Some(OptionalDuration(lcw)) = retain_history {
6065 Ok(Some(plan_retain_history(scx, lcw)?))
6066 } else {
6067 Ok(None)
6068 }
6069}
6070
6071fn plan_retain_history(
6077 scx: &StatementContext,
6078 lcw: Option<Duration>,
6079) -> Result<CompactionWindow, PlanError> {
6080 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6081 match lcw {
6082 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6087 option_name: "RETAIN HISTORY".to_string(),
6088 err: Box::new(PlanError::Unstructured(
6089 "internal error: unexpectedly zero".to_string(),
6090 )),
6091 }),
6092 Some(duration) => {
6093 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6096 && scx
6097 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6098 .is_err()
6099 {
6100 return Err(PlanError::RetainHistoryLow {
6101 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6102 });
6103 }
6104 Ok(duration.try_into()?)
6105 }
6106 None => {
6109 if scx
6110 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6111 .is_err()
6112 {
6113 Err(PlanError::RetainHistoryRequired)
6114 } else {
6115 Ok(CompactionWindow::DisableCompaction)
6116 }
6117 }
6118 }
6119}
6120
6121generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6122
6123fn plan_index_options(
6124 scx: &StatementContext,
6125 with_opts: Vec<IndexOption<Aug>>,
6126) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6127 if !with_opts.is_empty() {
6128 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6130 }
6131
6132 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6133
6134 let mut out = Vec::with_capacity(1);
6135 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6136 out.push(crate::plan::IndexOption::RetainHistory(cw));
6137 }
6138 Ok(out)
6139}
6140
6141generate_extracted_config!(
6142 TableOption,
6143 (PartitionBy, Vec<Ident>),
6144 (RetainHistory, OptionalDuration),
6145 (RedactedTest, String)
6146);
6147
6148fn plan_table_options(
6149 scx: &StatementContext,
6150 desc: &RelationDesc,
6151 with_opts: Vec<TableOption<Aug>>,
6152) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6153 let TableOptionExtracted {
6154 partition_by,
6155 retain_history,
6156 redacted_test,
6157 ..
6158 }: TableOptionExtracted = with_opts.try_into()?;
6159
6160 if let Some(partition_by) = partition_by {
6161 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6162 check_partition_by(desc, partition_by)?;
6163 }
6164
6165 if redacted_test.is_some() {
6166 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6167 }
6168
6169 let mut out = Vec::with_capacity(1);
6170 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6171 out.push(crate::plan::TableOption::RetainHistory(cw));
6172 }
6173 Ok(out)
6174}
6175
6176pub fn plan_alter_index_options(
6177 scx: &mut StatementContext,
6178 AlterIndexStatement {
6179 index_name,
6180 if_exists,
6181 action,
6182 }: AlterIndexStatement<Aug>,
6183) -> Result<Plan, PlanError> {
6184 let object_type = ObjectType::Index;
6185 match action {
6186 AlterIndexAction::ResetOptions(options) => {
6187 let mut options = options.into_iter();
6188 if let Some(opt) = options.next() {
6189 match opt {
6190 IndexOptionName::RetainHistory => {
6191 if options.next().is_some() {
6192 sql_bail!("RETAIN HISTORY must be only option");
6193 }
6194 return alter_retain_history(
6195 scx,
6196 object_type,
6197 if_exists,
6198 UnresolvedObjectName::Item(index_name),
6199 None,
6200 );
6201 }
6202 }
6203 }
6204 sql_bail!("expected option");
6205 }
6206 AlterIndexAction::SetOptions(options) => {
6207 let mut options = options.into_iter();
6208 if let Some(opt) = options.next() {
6209 match opt.name {
6210 IndexOptionName::RetainHistory => {
6211 if options.next().is_some() {
6212 sql_bail!("RETAIN HISTORY must be only option");
6213 }
6214 return alter_retain_history(
6215 scx,
6216 object_type,
6217 if_exists,
6218 UnresolvedObjectName::Item(index_name),
6219 opt.value,
6220 );
6221 }
6222 }
6223 }
6224 sql_bail!("expected option");
6225 }
6226 }
6227}
6228
6229pub fn describe_alter_cluster_set_options(
6230 _: &StatementContext,
6231 _: AlterClusterStatement<Aug>,
6232) -> Result<StatementDesc, PlanError> {
6233 Ok(StatementDesc::new(None))
6234}
6235
6236pub fn plan_alter_cluster(
6237 scx: &mut StatementContext,
6238 AlterClusterStatement {
6239 name,
6240 action,
6241 if_exists,
6242 }: AlterClusterStatement<Aug>,
6243) -> Result<Plan, PlanError> {
6244 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6245 Some(entry) => entry,
6246 None => {
6247 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6248 name: name.to_ast_string_simple(),
6249 object_type: ObjectType::Cluster,
6250 });
6251
6252 return Ok(Plan::AlterNoop(AlterNoopPlan {
6253 object_type: ObjectType::Cluster,
6254 }));
6255 }
6256 };
6257
6258 let mut options: PlanClusterOption = Default::default();
6259 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6260
6261 match action {
6262 AlterClusterAction::SetOptions {
6263 options: set_options,
6264 with_options,
6265 } => {
6266 let ClusterOptionExtracted {
6267 availability_zones,
6268 introspection_debugging,
6269 introspection_interval,
6270 managed,
6271 replicas: replica_defs,
6272 replication_factor,
6273 seen: _,
6274 size,
6275 disk,
6276 schedule,
6277 workload_class,
6278 }: ClusterOptionExtracted = set_options.try_into()?;
6279
6280 if !scx.catalog.active_role_id().is_system() {
6281 if workload_class.is_some() {
6282 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6283 }
6284 }
6285
6286 match managed.unwrap_or_else(|| cluster.is_managed()) {
6287 true => {
6288 let alter_strategy_extracted =
6289 ClusterAlterOptionExtracted::try_from(with_options)?;
6290 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6291
6292 match alter_strategy {
6293 AlterClusterPlanStrategy::None => {}
6294 _ => {
6295 scx.require_feature_flag(
6296 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6297 )?;
6298 }
6299 }
6300
6301 if replica_defs.is_some() {
6302 sql_bail!("REPLICAS not supported for managed clusters");
6303 }
6304 if schedule.is_some()
6305 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6306 {
6307 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6308 }
6309
6310 if let Some(replication_factor) = replication_factor {
6311 if schedule.is_some()
6312 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6313 {
6314 sql_bail!(
6315 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6316 );
6317 }
6318 if let Some(current_schedule) = cluster.schedule() {
6319 if !matches!(current_schedule, ClusterSchedule::Manual) {
6320 sql_bail!(
6321 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6322 );
6323 }
6324 }
6325
6326 let internal_replica_count =
6327 cluster.replicas().iter().filter(|r| r.internal()).count();
6328 let hypothetical_replica_count =
6329 internal_replica_count + usize::cast_from(replication_factor);
6330
6331 if contains_single_replica_objects(scx, cluster)
6334 && hypothetical_replica_count > 1
6335 {
6336 return Err(PlanError::CreateReplicaFailStorageObjects {
6337 current_replica_count: cluster.replica_ids().iter().count(),
6338 internal_replica_count,
6339 hypothetical_replica_count,
6340 });
6341 }
6342 } else if alter_strategy.is_some() {
6343 let internal_replica_count =
6347 cluster.replicas().iter().filter(|r| r.internal()).count();
6348 let hypothetical_replica_count = internal_replica_count * 2;
6349 if contains_single_replica_objects(scx, cluster) {
6350 return Err(PlanError::CreateReplicaFailStorageObjects {
6351 current_replica_count: cluster.replica_ids().iter().count(),
6352 internal_replica_count,
6353 hypothetical_replica_count,
6354 });
6355 }
6356 }
6357 }
6358 false => {
6359 if !alter_strategy.is_none() {
6360 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6361 }
6362 if availability_zones.is_some() {
6363 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6364 }
6365 if replication_factor.is_some() {
6366 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6367 }
6368 if introspection_debugging.is_some() {
6369 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6370 }
6371 if introspection_interval.is_some() {
6372 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6373 }
6374 if size.is_some() {
6375 sql_bail!("SIZE not supported for unmanaged clusters");
6376 }
6377 if disk.is_some() {
6378 sql_bail!("DISK not supported for unmanaged clusters");
6379 }
6380 if schedule.is_some()
6381 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6382 {
6383 sql_bail!(
6384 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6385 );
6386 }
6387 if let Some(current_schedule) = cluster.schedule() {
6388 if !matches!(current_schedule, ClusterSchedule::Manual)
6389 && schedule.is_none()
6390 {
6391 sql_bail!(
6392 "when switching a cluster to unmanaged, if the managed \
6393 cluster's SCHEDULE is anything other than MANUAL, you have to \
6394 explicitly set the SCHEDULE to MANUAL"
6395 );
6396 }
6397 }
6398 }
6399 }
6400
6401 let mut replicas = vec![];
6402 for ReplicaDefinition { name, options } in
6403 replica_defs.into_iter().flat_map(Vec::into_iter)
6404 {
6405 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6406 }
6407
6408 if let Some(managed) = managed {
6409 options.managed = AlterOptionParameter::Set(managed);
6410 }
6411 if let Some(replication_factor) = replication_factor {
6412 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6413 }
6414 if let Some(size) = &size {
6415 options.size = AlterOptionParameter::Set(size.clone());
6416 }
6417 if let Some(availability_zones) = availability_zones {
6418 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6419 }
6420 if let Some(introspection_debugging) = introspection_debugging {
6421 options.introspection_debugging =
6422 AlterOptionParameter::Set(introspection_debugging);
6423 }
6424 if let Some(introspection_interval) = introspection_interval {
6425 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6426 }
6427 if disk.is_some() {
6428 let size = size.as_deref().unwrap_or_else(|| {
6432 cluster.managed_size().expect("cluster known to be managed")
6433 });
6434 if scx.catalog.is_cluster_size_cc(size) {
6435 sql_bail!(
6436 "DISK option not supported for modern cluster sizes because disk is always enabled"
6437 );
6438 }
6439
6440 scx.catalog
6441 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6442 }
6443 if !replicas.is_empty() {
6444 options.replicas = AlterOptionParameter::Set(replicas);
6445 }
6446 if let Some(schedule) = schedule {
6447 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6448 }
6449 if let Some(workload_class) = workload_class {
6450 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6451 }
6452 }
6453 AlterClusterAction::ResetOptions(reset_options) => {
6454 use AlterOptionParameter::Reset;
6455 use ClusterOptionName::*;
6456
6457 if !scx.catalog.active_role_id().is_system() {
6458 if reset_options.contains(&WorkloadClass) {
6459 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6460 }
6461 }
6462
6463 for option in reset_options {
6464 match option {
6465 AvailabilityZones => options.availability_zones = Reset,
6466 Disk => scx
6467 .catalog
6468 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6469 IntrospectionInterval => options.introspection_interval = Reset,
6470 IntrospectionDebugging => options.introspection_debugging = Reset,
6471 Managed => options.managed = Reset,
6472 Replicas => options.replicas = Reset,
6473 ReplicationFactor => options.replication_factor = Reset,
6474 Size => options.size = Reset,
6475 Schedule => options.schedule = Reset,
6476 WorkloadClass => options.workload_class = Reset,
6477 }
6478 }
6479 }
6480 }
6481 Ok(Plan::AlterCluster(AlterClusterPlan {
6482 id: cluster.id(),
6483 name: cluster.name().to_string(),
6484 options,
6485 strategy: alter_strategy,
6486 }))
6487}
6488
6489pub fn describe_alter_set_cluster(
6490 _: &StatementContext,
6491 _: AlterSetClusterStatement<Aug>,
6492) -> Result<StatementDesc, PlanError> {
6493 Ok(StatementDesc::new(None))
6494}
6495
6496pub fn plan_alter_item_set_cluster(
6497 scx: &StatementContext,
6498 AlterSetClusterStatement {
6499 if_exists,
6500 set_cluster: in_cluster_name,
6501 name,
6502 object_type,
6503 }: AlterSetClusterStatement<Aug>,
6504) -> Result<Plan, PlanError> {
6505 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6506
6507 let object_type = object_type.into();
6508
6509 match object_type {
6511 ObjectType::MaterializedView => {}
6512 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6513 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6514 }
6515 _ => {
6516 bail_never_supported!(
6517 format!("ALTER {object_type} SET CLUSTER"),
6518 "sql/alter-set-cluster/",
6519 format!("{object_type} has no associated cluster")
6520 )
6521 }
6522 }
6523
6524 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6525
6526 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6527 Some(entry) => {
6528 let current_cluster = entry.cluster_id();
6529 let Some(current_cluster) = current_cluster else {
6530 sql_bail!("No cluster associated with {name}");
6531 };
6532
6533 if current_cluster == in_cluster.id() {
6534 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6535 } else {
6536 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6537 id: entry.id(),
6538 set_cluster: in_cluster.id(),
6539 }))
6540 }
6541 }
6542 None => {
6543 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6544 name: name.to_ast_string_simple(),
6545 object_type,
6546 });
6547
6548 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6549 }
6550 }
6551}
6552
6553pub fn describe_alter_object_rename(
6554 _: &StatementContext,
6555 _: AlterObjectRenameStatement,
6556) -> Result<StatementDesc, PlanError> {
6557 Ok(StatementDesc::new(None))
6558}
6559
6560pub fn plan_alter_object_rename(
6561 scx: &mut StatementContext,
6562 AlterObjectRenameStatement {
6563 name,
6564 object_type,
6565 to_item_name,
6566 if_exists,
6567 }: AlterObjectRenameStatement,
6568) -> Result<Plan, PlanError> {
6569 let object_type = object_type.into();
6570 match (object_type, name) {
6571 (
6572 ObjectType::View
6573 | ObjectType::MaterializedView
6574 | ObjectType::Table
6575 | ObjectType::Source
6576 | ObjectType::Index
6577 | ObjectType::Sink
6578 | ObjectType::Secret
6579 | ObjectType::Connection,
6580 UnresolvedObjectName::Item(name),
6581 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6582 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6583 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6584 }
6585 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6586 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6587 }
6588 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6589 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6590 }
6591 (object_type, name) => {
6592 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6593 }
6594 }
6595}
6596
6597pub fn plan_alter_schema_rename(
6598 scx: &mut StatementContext,
6599 name: UnresolvedSchemaName,
6600 to_schema_name: Ident,
6601 if_exists: bool,
6602) -> Result<Plan, PlanError> {
6603 let normalized = normalize::unresolved_schema_name(name.clone())?;
6607 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6608 sql_bail!(
6609 "cannot rename schemas in the ambient database: {:?}",
6610 mz_repr::namespaces::MZ_TEMP_SCHEMA
6611 );
6612 }
6613
6614 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6615 let object_type = ObjectType::Schema;
6616 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6617 name: name.to_ast_string_simple(),
6618 object_type,
6619 });
6620 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6621 };
6622
6623 if scx
6625 .resolve_schema_in_database(&db_spec, &to_schema_name)
6626 .is_ok()
6627 {
6628 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6629 to_schema_name.clone().into_string(),
6630 )));
6631 }
6632
6633 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6635 if schema.id().is_system() {
6636 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6637 }
6638
6639 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6640 cur_schema_spec: (db_spec, schema_spec),
6641 new_schema_name: to_schema_name.into_string(),
6642 }))
6643}
6644
6645pub fn plan_alter_schema_swap<F>(
6646 scx: &mut StatementContext,
6647 name_a: UnresolvedSchemaName,
6648 name_b: Ident,
6649 gen_temp_suffix: F,
6650) -> Result<Plan, PlanError>
6651where
6652 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6653{
6654 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6658 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6659 {
6660 sql_bail!("cannot swap schemas that are in the ambient database");
6661 }
6662 let name_b_str = normalize::ident_ref(&name_b);
6664 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6665 sql_bail!("cannot swap schemas that are in the ambient database");
6666 }
6667
6668 let schema_a = scx.resolve_schema(name_a.clone())?;
6669
6670 let db_spec = schema_a.database().clone();
6671 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6672 sql_bail!("cannot swap schemas that are in the ambient database");
6673 };
6674 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6675
6676 if schema_a.id().is_system() || schema_b.id().is_system() {
6678 bail_never_supported!("swapping a system schema".to_string())
6679 }
6680
6681 let check = |temp_suffix: &str| {
6685 let mut temp_name = ident!("mz_schema_swap_");
6686 temp_name.append_lossy(temp_suffix);
6687 scx.resolve_schema_in_database(&db_spec, &temp_name)
6688 .is_err()
6689 };
6690 let temp_suffix = gen_temp_suffix(&check)?;
6691 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6692
6693 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6694 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6695 schema_a_name: schema_a.name().schema.to_string(),
6696 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6697 schema_b_name: schema_b.name().schema.to_string(),
6698 name_temp,
6699 }))
6700}
6701
6702pub fn plan_alter_item_rename(
6703 scx: &mut StatementContext,
6704 object_type: ObjectType,
6705 name: UnresolvedItemName,
6706 to_item_name: Ident,
6707 if_exists: bool,
6708) -> Result<Plan, PlanError> {
6709 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6710 Ok(r) => r,
6711 Err(PlanError::MismatchedObjectType {
6713 name,
6714 is_type: ObjectType::MaterializedView,
6715 expected_type: ObjectType::View,
6716 }) => {
6717 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6718 }
6719 e => e?,
6720 };
6721
6722 match resolved {
6723 Some(entry) => {
6724 let full_name = scx.catalog.resolve_full_name(entry.name());
6725 let item_type = entry.item_type();
6726
6727 let proposed_name = QualifiedItemName {
6728 qualifiers: entry.name().qualifiers.clone(),
6729 item: to_item_name.clone().into_string(),
6730 };
6731
6732 let conflicting_type_exists;
6736 let conflicting_item_exists;
6737 if item_type == CatalogItemType::Type {
6738 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6739 conflicting_item_exists = scx
6740 .catalog
6741 .get_item_by_name(&proposed_name)
6742 .map(|item| item.item_type().conflicts_with_type())
6743 .unwrap_or(false);
6744 } else {
6745 conflicting_type_exists = item_type.conflicts_with_type()
6746 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6747 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6748 };
6749 if conflicting_type_exists || conflicting_item_exists {
6750 sql_bail!("catalog item '{}' already exists", to_item_name);
6751 }
6752
6753 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6754 id: entry.id(),
6755 current_full_name: full_name,
6756 to_name: normalize::ident(to_item_name),
6757 object_type,
6758 }))
6759 }
6760 None => {
6761 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6762 name: name.to_ast_string_simple(),
6763 object_type,
6764 });
6765
6766 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6767 }
6768 }
6769}
6770
6771pub fn plan_alter_cluster_rename(
6772 scx: &mut StatementContext,
6773 object_type: ObjectType,
6774 name: Ident,
6775 to_name: Ident,
6776 if_exists: bool,
6777) -> Result<Plan, PlanError> {
6778 match resolve_cluster(scx, &name, if_exists)? {
6779 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6780 id: entry.id(),
6781 name: entry.name().to_string(),
6782 to_name: ident(to_name),
6783 })),
6784 None => {
6785 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6786 name: name.to_ast_string_simple(),
6787 object_type,
6788 });
6789
6790 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6791 }
6792 }
6793}
6794
6795pub fn plan_alter_cluster_swap<F>(
6796 scx: &mut StatementContext,
6797 name_a: Ident,
6798 name_b: Ident,
6799 gen_temp_suffix: F,
6800) -> Result<Plan, PlanError>
6801where
6802 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6803{
6804 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6805 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6806
6807 let check = |temp_suffix: &str| {
6808 let mut temp_name = ident!("mz_schema_swap_");
6809 temp_name.append_lossy(temp_suffix);
6810 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6811 Err(CatalogError::UnknownCluster(_)) => true,
6813 Ok(_) | Err(_) => false,
6815 }
6816 };
6817 let temp_suffix = gen_temp_suffix(&check)?;
6818 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6819
6820 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6821 id_a: cluster_a.id(),
6822 id_b: cluster_b.id(),
6823 name_a: name_a.into_string(),
6824 name_b: name_b.into_string(),
6825 name_temp,
6826 }))
6827}
6828
6829pub fn plan_alter_cluster_replica_rename(
6830 scx: &mut StatementContext,
6831 object_type: ObjectType,
6832 name: QualifiedReplica,
6833 to_item_name: Ident,
6834 if_exists: bool,
6835) -> Result<Plan, PlanError> {
6836 match resolve_cluster_replica(scx, &name, if_exists)? {
6837 Some((cluster, replica)) => {
6838 ensure_cluster_is_not_managed(scx, cluster.id())?;
6839 Ok(Plan::AlterClusterReplicaRename(
6840 AlterClusterReplicaRenamePlan {
6841 cluster_id: cluster.id(),
6842 replica_id: replica,
6843 name: QualifiedReplica {
6844 cluster: Ident::new(cluster.name())?,
6845 replica: name.replica,
6846 },
6847 to_name: normalize::ident(to_item_name),
6848 },
6849 ))
6850 }
6851 None => {
6852 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6853 name: name.to_ast_string_simple(),
6854 object_type,
6855 });
6856
6857 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6858 }
6859 }
6860}
6861
6862pub fn describe_alter_object_swap(
6863 _: &StatementContext,
6864 _: AlterObjectSwapStatement,
6865) -> Result<StatementDesc, PlanError> {
6866 Ok(StatementDesc::new(None))
6867}
6868
6869pub fn plan_alter_object_swap(
6870 scx: &mut StatementContext,
6871 stmt: AlterObjectSwapStatement,
6872) -> Result<Plan, PlanError> {
6873 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6874
6875 let AlterObjectSwapStatement {
6876 object_type,
6877 name_a,
6878 name_b,
6879 } = stmt;
6880 let object_type = object_type.into();
6881
6882 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6884 let mut attempts = 0;
6885 let name_temp = loop {
6886 attempts += 1;
6887 if attempts > 10 {
6888 tracing::warn!("Unable to generate temp id for swapping");
6889 sql_bail!("unable to swap!");
6890 }
6891
6892 let short_id = mz_ore::id_gen::temp_id();
6894 if check_fn(&short_id) {
6895 break short_id;
6896 }
6897 };
6898
6899 Ok(name_temp)
6900 };
6901
6902 match (object_type, name_a, name_b) {
6903 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6904 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6905 }
6906 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6907 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6908 }
6909 (object_type, _, _) => Err(PlanError::Unsupported {
6910 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6911 discussion_no: None,
6912 }),
6913 }
6914}
6915
6916pub fn describe_alter_retain_history(
6917 _: &StatementContext,
6918 _: AlterRetainHistoryStatement<Aug>,
6919) -> Result<StatementDesc, PlanError> {
6920 Ok(StatementDesc::new(None))
6921}
6922
6923pub fn plan_alter_retain_history(
6924 scx: &StatementContext,
6925 AlterRetainHistoryStatement {
6926 object_type,
6927 if_exists,
6928 name,
6929 history,
6930 }: AlterRetainHistoryStatement<Aug>,
6931) -> Result<Plan, PlanError> {
6932 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6933}
6934
6935fn alter_retain_history(
6936 scx: &StatementContext,
6937 object_type: ObjectType,
6938 if_exists: bool,
6939 name: UnresolvedObjectName,
6940 history: Option<WithOptionValue<Aug>>,
6941) -> Result<Plan, PlanError> {
6942 let name = match (object_type, name) {
6943 (
6944 ObjectType::View
6946 | ObjectType::MaterializedView
6947 | ObjectType::Table
6948 | ObjectType::Source
6949 | ObjectType::Index,
6950 UnresolvedObjectName::Item(name),
6951 ) => name,
6952 (object_type, _) => {
6953 sql_bail!("{object_type} does not support RETAIN HISTORY")
6954 }
6955 };
6956 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6957 Some(entry) => {
6958 let full_name = scx.catalog.resolve_full_name(entry.name());
6959 let item_type = entry.item_type();
6960
6961 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6963 return Err(PlanError::AlterViewOnMaterializedView(
6964 full_name.to_string(),
6965 ));
6966 } else if object_type == ObjectType::View {
6967 sql_bail!("{object_type} does not support RETAIN HISTORY")
6968 } else if object_type != item_type {
6969 sql_bail!(
6970 "\"{}\" is a {} not a {}",
6971 full_name,
6972 entry.item_type(),
6973 format!("{object_type}").to_lowercase()
6974 )
6975 }
6976
6977 let (value, lcw) = match &history {
6979 Some(WithOptionValue::RetainHistoryFor(value)) => {
6980 let window = OptionalDuration::try_from_value(value.clone())?;
6981 (Some(value.clone()), window.0)
6982 }
6983 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6985 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6986 };
6987 let window = plan_retain_history(scx, lcw)?;
6988
6989 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6990 id: entry.id(),
6991 value,
6992 window,
6993 object_type,
6994 }))
6995 }
6996 None => {
6997 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6998 name: name.to_ast_string_simple(),
6999 object_type,
7000 });
7001
7002 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7003 }
7004 }
7005}
7006
7007fn alter_source_timestamp_interval(
7008 scx: &StatementContext,
7009 if_exists: bool,
7010 source_name: UnresolvedItemName,
7011 value: Option<WithOptionValue<Aug>>,
7012) -> Result<Plan, PlanError> {
7013 let object_type = ObjectType::Source;
7014 match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
7015 Some(entry) => {
7016 let full_name = scx.catalog.resolve_full_name(entry.name());
7017 if entry.item_type() != CatalogItemType::Source {
7018 sql_bail!(
7019 "\"{}\" is a {} not a {}",
7020 full_name,
7021 entry.item_type(),
7022 format!("{object_type}").to_lowercase()
7023 )
7024 }
7025
7026 match value {
7027 Some(val) => {
7028 let val = match val {
7029 WithOptionValue::Value(v) => v,
7030 _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
7031 };
7032 let duration = Duration::try_from_value(val.clone())?;
7033
7034 let min = scx.catalog.system_vars().min_timestamp_interval();
7035 let max = scx.catalog.system_vars().max_timestamp_interval();
7036 if duration < min || duration > max {
7037 return Err(PlanError::InvalidTimestampInterval {
7038 min,
7039 max,
7040 requested: duration,
7041 });
7042 }
7043
7044 Ok(Plan::AlterSourceTimestampInterval(
7045 AlterSourceTimestampIntervalPlan {
7046 id: entry.id(),
7047 value: Some(val),
7048 interval: duration,
7049 },
7050 ))
7051 }
7052 None => {
7053 let interval = scx.catalog.system_vars().default_timestamp_interval();
7054 Ok(Plan::AlterSourceTimestampInterval(
7055 AlterSourceTimestampIntervalPlan {
7056 id: entry.id(),
7057 value: None,
7058 interval,
7059 },
7060 ))
7061 }
7062 }
7063 }
7064 None => {
7065 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7066 name: source_name.to_ast_string_simple(),
7067 object_type,
7068 });
7069
7070 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7071 }
7072 }
7073}
7074
7075pub fn describe_alter_secret_options(
7076 _: &StatementContext,
7077 _: AlterSecretStatement<Aug>,
7078) -> Result<StatementDesc, PlanError> {
7079 Ok(StatementDesc::new(None))
7080}
7081
7082pub fn plan_alter_secret(
7083 scx: &mut StatementContext,
7084 stmt: AlterSecretStatement<Aug>,
7085) -> Result<Plan, PlanError> {
7086 let AlterSecretStatement {
7087 name,
7088 if_exists,
7089 value,
7090 } = stmt;
7091 let object_type = ObjectType::Secret;
7092 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7093 Some(entry) => entry.id(),
7094 None => {
7095 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7096 name: name.to_string(),
7097 object_type,
7098 });
7099
7100 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7101 }
7102 };
7103
7104 let secret_as = query::plan_secret_as(scx, value)?;
7105
7106 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7107}
7108
7109pub fn describe_alter_connection(
7110 _: &StatementContext,
7111 _: AlterConnectionStatement<Aug>,
7112) -> Result<StatementDesc, PlanError> {
7113 Ok(StatementDesc::new(None))
7114}
7115
7116generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7117
7118pub fn plan_alter_connection(
7119 scx: &StatementContext,
7120 stmt: AlterConnectionStatement<Aug>,
7121) -> Result<Plan, PlanError> {
7122 let AlterConnectionStatement {
7123 name,
7124 if_exists,
7125 actions,
7126 with_options,
7127 } = stmt;
7128 let conn_name = normalize::unresolved_item_name(name)?;
7129 let entry = match scx.catalog.resolve_item(&conn_name) {
7130 Ok(entry) => entry,
7131 Err(_) if if_exists => {
7132 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7133 name: conn_name.to_string(),
7134 object_type: ObjectType::Sink,
7135 });
7136
7137 return Ok(Plan::AlterNoop(AlterNoopPlan {
7138 object_type: ObjectType::Connection,
7139 }));
7140 }
7141 Err(e) => return Err(e.into()),
7142 };
7143
7144 let connection = entry.connection()?;
7145
7146 if actions
7147 .iter()
7148 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7149 {
7150 if actions.len() > 1 {
7151 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7152 }
7153
7154 if !with_options.is_empty() {
7155 sql_bail!(
7156 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7157 with_options
7158 .iter()
7159 .map(|o| o.to_ast_string_simple())
7160 .join(", ")
7161 );
7162 }
7163
7164 if !matches!(connection, Connection::Ssh(_)) {
7165 sql_bail!(
7166 "{} is not an SSH connection",
7167 scx.catalog.resolve_full_name(entry.name())
7168 )
7169 }
7170
7171 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7172 id: entry.id(),
7173 action: crate::plan::AlterConnectionAction::RotateKeys,
7174 }));
7175 }
7176
7177 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7178 if options.validate.is_some() {
7179 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7180 }
7181
7182 let validate = match options.validate {
7183 Some(val) => val,
7184 None => {
7185 scx.catalog
7186 .system_vars()
7187 .enable_default_connection_validation()
7188 && connection.validate_by_default()
7189 }
7190 };
7191
7192 let connection_type = match connection {
7193 Connection::Aws(_) => CreateConnectionType::Aws,
7194 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7195 Connection::Kafka(_) => CreateConnectionType::Kafka,
7196 Connection::Csr(_) => CreateConnectionType::Csr,
7197 Connection::Postgres(_) => CreateConnectionType::Postgres,
7198 Connection::Ssh(_) => CreateConnectionType::Ssh,
7199 Connection::MySql(_) => CreateConnectionType::MySql,
7200 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7201 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7202 };
7203
7204 let specified_options: BTreeSet<_> = actions
7206 .iter()
7207 .map(|action: &AlterConnectionAction<Aug>| match action {
7208 AlterConnectionAction::SetOption(option) => option.name.clone(),
7209 AlterConnectionAction::DropOption(name) => name.clone(),
7210 AlterConnectionAction::RotateKeys => unreachable!(),
7211 })
7212 .collect();
7213
7214 for invalid in INALTERABLE_OPTIONS {
7215 if specified_options.contains(invalid) {
7216 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7217 }
7218 }
7219
7220 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7221
7222 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7224 actions.into_iter().partition_map(|action| match action {
7225 AlterConnectionAction::SetOption(option) => Either::Left(option),
7226 AlterConnectionAction::DropOption(name) => Either::Right(name),
7227 AlterConnectionAction::RotateKeys => unreachable!(),
7228 });
7229
7230 let set_options: BTreeMap<_, _> = set_options_vec
7231 .clone()
7232 .into_iter()
7233 .map(|option| (option.name, option.value))
7234 .collect();
7235
7236 let connection_options_extracted =
7240 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7241
7242 let duplicates: Vec<_> = connection_options_extracted
7243 .seen
7244 .intersection(&drop_options)
7245 .collect();
7246
7247 if !duplicates.is_empty() {
7248 sql_bail!(
7249 "cannot both SET and DROP/RESET options {}",
7250 duplicates
7251 .iter()
7252 .map(|option| option.to_string())
7253 .join(", ")
7254 )
7255 }
7256
7257 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7258 let set_options_count = mutually_exclusive_options
7259 .iter()
7260 .filter(|o| set_options.contains_key(o))
7261 .count();
7262 let drop_options_count = mutually_exclusive_options
7263 .iter()
7264 .filter(|o| drop_options.contains(o))
7265 .count();
7266
7267 if set_options_count > 0 && drop_options_count > 0 {
7269 sql_bail!(
7270 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7271 connection_type,
7272 mutually_exclusive_options
7273 .iter()
7274 .map(|option| option.to_string())
7275 .join(", ")
7276 )
7277 }
7278
7279 if set_options_count > 0 || drop_options_count > 0 {
7284 drop_options.extend(mutually_exclusive_options.iter().cloned());
7285 }
7286
7287 }
7290
7291 Ok(Plan::AlterConnection(AlterConnectionPlan {
7292 id: entry.id(),
7293 action: crate::plan::AlterConnectionAction::AlterOptions {
7294 set_options,
7295 drop_options,
7296 validate,
7297 },
7298 }))
7299}
7300
7301pub fn describe_alter_sink(
7302 _: &StatementContext,
7303 _: AlterSinkStatement<Aug>,
7304) -> Result<StatementDesc, PlanError> {
7305 Ok(StatementDesc::new(None))
7306}
7307
7308pub fn plan_alter_sink(
7309 scx: &mut StatementContext,
7310 stmt: AlterSinkStatement<Aug>,
7311) -> Result<Plan, PlanError> {
7312 let AlterSinkStatement {
7313 sink_name,
7314 if_exists,
7315 action,
7316 } = stmt;
7317
7318 let object_type = ObjectType::Sink;
7319 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7320
7321 let Some(item) = item else {
7322 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7323 name: sink_name.to_string(),
7324 object_type,
7325 });
7326
7327 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7328 };
7329 let item = item.at_version(RelationVersionSelector::Latest);
7331
7332 match action {
7333 AlterSinkAction::ChangeRelation(new_from) => {
7334 let create_sql = item.create_sql();
7336 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7337 let [stmt]: [StatementParseResult; 1] = stmts
7338 .try_into()
7339 .expect("create sql of sink was not exactly one statement");
7340 let Statement::CreateSink(stmt) = stmt.ast else {
7341 unreachable!("invalid create SQL for sink item");
7342 };
7343
7344 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7346 stmt.from = new_from;
7347
7348 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7350 unreachable!("invalid plan for CREATE SINK statement");
7351 };
7352
7353 plan.sink.version += 1;
7354
7355 Ok(Plan::AlterSink(AlterSinkPlan {
7356 item_id: item.id(),
7357 global_id: item.global_id(),
7358 sink: plan.sink,
7359 with_snapshot: plan.with_snapshot,
7360 in_cluster: plan.in_cluster,
7361 }))
7362 }
7363 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7364 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7365 }
7366}
7367
7368pub fn describe_alter_source(
7369 _: &StatementContext,
7370 _: AlterSourceStatement<Aug>,
7371) -> Result<StatementDesc, PlanError> {
7372 Ok(StatementDesc::new(None))
7374}
7375
7376generate_extracted_config!(
7377 AlterSourceAddSubsourceOption,
7378 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7379 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7380 (Details, String)
7381);
7382
7383pub fn plan_alter_source(
7384 scx: &mut StatementContext,
7385 stmt: AlterSourceStatement<Aug>,
7386) -> Result<Plan, PlanError> {
7387 let AlterSourceStatement {
7388 source_name,
7389 if_exists,
7390 action,
7391 } = stmt;
7392 let object_type = ObjectType::Source;
7393
7394 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7395 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7396 name: source_name.to_string(),
7397 object_type,
7398 });
7399
7400 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7401 }
7402
7403 match action {
7404 AlterSourceAction::SetOptions(options) => {
7405 let mut options = options.into_iter();
7406 let option = options.next().unwrap();
7407 if option.name == CreateSourceOptionName::RetainHistory {
7408 if options.next().is_some() {
7409 sql_bail!("RETAIN HISTORY must be only option");
7410 }
7411 return alter_retain_history(
7412 scx,
7413 object_type,
7414 if_exists,
7415 UnresolvedObjectName::Item(source_name),
7416 option.value,
7417 );
7418 }
7419 if option.name == CreateSourceOptionName::TimestampInterval {
7420 if options.next().is_some() {
7421 sql_bail!("TIMESTAMP INTERVAL must be only option");
7422 }
7423 return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7424 }
7425 sql_bail!(
7428 "Cannot modify the {} of a SOURCE.",
7429 option.name.to_ast_string_simple()
7430 );
7431 }
7432 AlterSourceAction::ResetOptions(reset) => {
7433 let mut options = reset.into_iter();
7434 let option = options.next().unwrap();
7435 if option == CreateSourceOptionName::RetainHistory {
7436 if options.next().is_some() {
7437 sql_bail!("RETAIN HISTORY must be only option");
7438 }
7439 return alter_retain_history(
7440 scx,
7441 object_type,
7442 if_exists,
7443 UnresolvedObjectName::Item(source_name),
7444 None,
7445 );
7446 }
7447 if option == CreateSourceOptionName::TimestampInterval {
7448 if options.next().is_some() {
7449 sql_bail!("TIMESTAMP INTERVAL must be only option");
7450 }
7451 return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7452 }
7453 sql_bail!(
7454 "Cannot modify the {} of a SOURCE.",
7455 option.to_ast_string_simple()
7456 );
7457 }
7458 AlterSourceAction::DropSubsources { .. } => {
7459 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7460 }
7461 AlterSourceAction::AddSubsources { .. } => {
7462 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7463 }
7464 AlterSourceAction::RefreshReferences => {
7465 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7466 }
7467 };
7468}
7469
7470pub fn describe_alter_system_set(
7471 _: &StatementContext,
7472 _: AlterSystemSetStatement,
7473) -> Result<StatementDesc, PlanError> {
7474 Ok(StatementDesc::new(None))
7475}
7476
7477pub fn plan_alter_system_set(
7478 _: &StatementContext,
7479 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7480) -> Result<Plan, PlanError> {
7481 let name = name.to_string();
7482 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7483 name,
7484 value: scl::plan_set_variable_to(to)?,
7485 }))
7486}
7487
7488pub fn describe_alter_system_reset(
7489 _: &StatementContext,
7490 _: AlterSystemResetStatement,
7491) -> Result<StatementDesc, PlanError> {
7492 Ok(StatementDesc::new(None))
7493}
7494
7495pub fn plan_alter_system_reset(
7496 _: &StatementContext,
7497 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7498) -> Result<Plan, PlanError> {
7499 let name = name.to_string();
7500 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7501}
7502
7503pub fn describe_alter_system_reset_all(
7504 _: &StatementContext,
7505 _: AlterSystemResetAllStatement,
7506) -> Result<StatementDesc, PlanError> {
7507 Ok(StatementDesc::new(None))
7508}
7509
7510pub fn plan_alter_system_reset_all(
7511 _: &StatementContext,
7512 _: AlterSystemResetAllStatement,
7513) -> Result<Plan, PlanError> {
7514 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7515}
7516
7517pub fn describe_alter_role(
7518 _: &StatementContext,
7519 _: AlterRoleStatement<Aug>,
7520) -> Result<StatementDesc, PlanError> {
7521 Ok(StatementDesc::new(None))
7522}
7523
7524pub fn plan_alter_role(
7525 scx: &StatementContext,
7526 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7527) -> Result<Plan, PlanError> {
7528 let option = match option {
7529 AlterRoleOption::Attributes(attrs) => {
7530 let attrs = plan_role_attributes(attrs, scx)?;
7531 PlannedAlterRoleOption::Attributes(attrs)
7532 }
7533 AlterRoleOption::Variable(variable) => {
7534 let var = plan_role_variable(variable)?;
7535 PlannedAlterRoleOption::Variable(var)
7536 }
7537 };
7538
7539 Ok(Plan::AlterRole(AlterRolePlan {
7540 id: name.id,
7541 name: name.name,
7542 option,
7543 }))
7544}
7545
7546pub fn describe_alter_table_add_column(
7547 _: &StatementContext,
7548 _: AlterTableAddColumnStatement<Aug>,
7549) -> Result<StatementDesc, PlanError> {
7550 Ok(StatementDesc::new(None))
7551}
7552
7553pub fn plan_alter_table_add_column(
7554 scx: &StatementContext,
7555 stmt: AlterTableAddColumnStatement<Aug>,
7556) -> Result<Plan, PlanError> {
7557 let AlterTableAddColumnStatement {
7558 if_exists,
7559 name,
7560 if_col_not_exist,
7561 column_name,
7562 data_type,
7563 } = stmt;
7564 let object_type = ObjectType::Table;
7565
7566 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7567
7568 let (relation_id, item_name, desc) =
7569 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7570 Some(item) => {
7571 let item_name = scx.catalog.resolve_full_name(item.name());
7573 let item = item.at_version(RelationVersionSelector::Latest);
7574 let desc = item.relation_desc().expect("table has desc").into_owned();
7575 (item.id(), item_name, desc)
7576 }
7577 None => {
7578 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7579 name: name.to_ast_string_simple(),
7580 object_type,
7581 });
7582 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7583 }
7584 };
7585
7586 let column_name = ColumnName::from(column_name.as_str());
7587 if desc.get_by_name(&column_name).is_some() {
7588 if if_col_not_exist {
7589 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7590 column_name: column_name.to_string(),
7591 object_name: item_name.item,
7592 });
7593 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7594 } else {
7595 return Err(PlanError::ColumnAlreadyExists {
7596 column_name,
7597 object_name: item_name.item,
7598 });
7599 }
7600 }
7601
7602 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7603 let column_type = scalar_type.nullable(true);
7605 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7607
7608 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7609 relation_id,
7610 column_name,
7611 column_type,
7612 raw_sql_type,
7613 }))
7614}
7615
7616pub fn describe_alter_materialized_view_apply_replacement(
7617 _: &StatementContext,
7618 _: AlterMaterializedViewApplyReplacementStatement,
7619) -> Result<StatementDesc, PlanError> {
7620 Ok(StatementDesc::new(None))
7621}
7622
7623pub fn plan_alter_materialized_view_apply_replacement(
7624 scx: &StatementContext,
7625 stmt: AlterMaterializedViewApplyReplacementStatement,
7626) -> Result<Plan, PlanError> {
7627 let AlterMaterializedViewApplyReplacementStatement {
7628 if_exists,
7629 name,
7630 replacement_name,
7631 } = stmt;
7632
7633 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7634
7635 let object_type = ObjectType::MaterializedView;
7636 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7637 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7638 name: name.to_ast_string_simple(),
7639 object_type,
7640 });
7641 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7642 };
7643
7644 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7645 .expect("if_exists not set");
7646
7647 if replacement.replacement_target() != Some(mv.id()) {
7648 return Err(PlanError::InvalidReplacement {
7649 item_type: mv.item_type(),
7650 item_name: scx.catalog.minimal_qualification(mv.name()),
7651 replacement_type: replacement.item_type(),
7652 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7653 });
7654 }
7655
7656 Ok(Plan::AlterMaterializedViewApplyReplacement(
7657 AlterMaterializedViewApplyReplacementPlan {
7658 id: mv.id(),
7659 replacement_id: replacement.id(),
7660 },
7661 ))
7662}
7663
7664pub fn describe_comment(
7665 _: &StatementContext,
7666 _: CommentStatement<Aug>,
7667) -> Result<StatementDesc, PlanError> {
7668 Ok(StatementDesc::new(None))
7669}
7670
7671pub fn plan_comment(
7672 scx: &mut StatementContext,
7673 stmt: CommentStatement<Aug>,
7674) -> Result<Plan, PlanError> {
7675 const MAX_COMMENT_LENGTH: usize = 1024;
7676
7677 let CommentStatement { object, comment } = stmt;
7678
7679 if let Some(c) = &comment {
7681 if c.len() > 1024 {
7682 return Err(PlanError::CommentTooLong {
7683 length: c.len(),
7684 max_size: MAX_COMMENT_LENGTH,
7685 });
7686 }
7687 }
7688
7689 let (object_id, column_pos) = match &object {
7690 com_ty @ CommentObjectType::Table { name }
7691 | com_ty @ CommentObjectType::View { name }
7692 | com_ty @ CommentObjectType::MaterializedView { name }
7693 | com_ty @ CommentObjectType::Index { name }
7694 | com_ty @ CommentObjectType::Func { name }
7695 | com_ty @ CommentObjectType::Connection { name }
7696 | com_ty @ CommentObjectType::Source { name }
7697 | com_ty @ CommentObjectType::Sink { name }
7698 | com_ty @ CommentObjectType::Secret { name }
7699 | com_ty @ CommentObjectType::ContinualTask { name } => {
7700 let item = scx.get_item_by_resolved_name(name)?;
7701 match (com_ty, item.item_type()) {
7702 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7703 (CommentObjectId::Table(item.id()), None)
7704 }
7705 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7706 (CommentObjectId::View(item.id()), None)
7707 }
7708 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7709 (CommentObjectId::MaterializedView(item.id()), None)
7710 }
7711 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7712 (CommentObjectId::Index(item.id()), None)
7713 }
7714 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7715 (CommentObjectId::Func(item.id()), None)
7716 }
7717 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7718 (CommentObjectId::Connection(item.id()), None)
7719 }
7720 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7721 (CommentObjectId::Source(item.id()), None)
7722 }
7723 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7724 (CommentObjectId::Sink(item.id()), None)
7725 }
7726 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7727 (CommentObjectId::Secret(item.id()), None)
7728 }
7729 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7730 (CommentObjectId::ContinualTask(item.id()), None)
7731 }
7732 (com_ty, cat_ty) => {
7733 let expected_type = match com_ty {
7734 CommentObjectType::Table { .. } => ObjectType::Table,
7735 CommentObjectType::View { .. } => ObjectType::View,
7736 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7737 CommentObjectType::Index { .. } => ObjectType::Index,
7738 CommentObjectType::Func { .. } => ObjectType::Func,
7739 CommentObjectType::Connection { .. } => ObjectType::Connection,
7740 CommentObjectType::Source { .. } => ObjectType::Source,
7741 CommentObjectType::Sink { .. } => ObjectType::Sink,
7742 CommentObjectType::Secret { .. } => ObjectType::Secret,
7743 _ => unreachable!("these are the only types we match on"),
7744 };
7745
7746 return Err(PlanError::InvalidObjectType {
7747 expected_type: SystemObjectType::Object(expected_type),
7748 actual_type: SystemObjectType::Object(cat_ty.into()),
7749 object_name: item.name().item.clone(),
7750 });
7751 }
7752 }
7753 }
7754 CommentObjectType::Type { ty } => match ty {
7755 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7756 sql_bail!("cannot comment on anonymous list or map type");
7757 }
7758 ResolvedDataType::Named { id, modifiers, .. } => {
7759 if !modifiers.is_empty() {
7760 sql_bail!("cannot comment on type with modifiers");
7761 }
7762 (CommentObjectId::Type(*id), None)
7763 }
7764 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7765 },
7766 CommentObjectType::Column { name } => {
7767 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7768 match item.item_type() {
7769 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7770 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7771 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7772 CatalogItemType::MaterializedView => {
7773 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7774 }
7775 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7776 r => {
7777 return Err(PlanError::Unsupported {
7778 feature: format!("Specifying comments on a column of {r}"),
7779 discussion_no: None,
7780 });
7781 }
7782 }
7783 }
7784 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7785 CommentObjectType::Database { name } => {
7786 (CommentObjectId::Database(*name.database_id()), None)
7787 }
7788 CommentObjectType::Schema { name } => {
7789 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7793 sql_bail!(
7794 "cannot comment on schema {} because it is a temporary schema",
7795 mz_repr::namespaces::MZ_TEMP_SCHEMA
7796 );
7797 }
7798 (
7799 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7800 None,
7801 )
7802 }
7803 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7804 CommentObjectType::ClusterReplica { name } => {
7805 let replica = scx.catalog.resolve_cluster_replica(name)?;
7806 (
7807 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7808 None,
7809 )
7810 }
7811 CommentObjectType::NetworkPolicy { name } => {
7812 (CommentObjectId::NetworkPolicy(name.id), None)
7813 }
7814 };
7815
7816 if let Some(p) = column_pos {
7822 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7823 max_num_columns: MAX_NUM_COLUMNS,
7824 req_num_columns: p,
7825 })?;
7826 }
7827
7828 Ok(Plan::Comment(CommentPlan {
7829 object_id,
7830 sub_component: column_pos,
7831 comment,
7832 }))
7833}
7834
7835pub(crate) fn resolve_cluster<'a>(
7836 scx: &'a StatementContext,
7837 name: &'a Ident,
7838 if_exists: bool,
7839) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7840 match scx.resolve_cluster(Some(name)) {
7841 Ok(cluster) => Ok(Some(cluster)),
7842 Err(_) if if_exists => Ok(None),
7843 Err(e) => Err(e),
7844 }
7845}
7846
7847pub(crate) fn resolve_cluster_replica<'a>(
7848 scx: &'a StatementContext,
7849 name: &QualifiedReplica,
7850 if_exists: bool,
7851) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7852 match scx.resolve_cluster(Some(&name.cluster)) {
7853 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7854 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7855 None if if_exists => Ok(None),
7856 None => Err(sql_err!(
7857 "CLUSTER {} has no CLUSTER REPLICA named {}",
7858 cluster.name(),
7859 name.replica.as_str().quoted(),
7860 )),
7861 },
7862 Err(_) if if_exists => Ok(None),
7863 Err(e) => Err(e),
7864 }
7865}
7866
7867pub(crate) fn resolve_database<'a>(
7868 scx: &'a StatementContext,
7869 name: &'a UnresolvedDatabaseName,
7870 if_exists: bool,
7871) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7872 match scx.resolve_database(name) {
7873 Ok(database) => Ok(Some(database)),
7874 Err(_) if if_exists => Ok(None),
7875 Err(e) => Err(e),
7876 }
7877}
7878
7879pub(crate) fn resolve_schema<'a>(
7880 scx: &'a StatementContext,
7881 name: UnresolvedSchemaName,
7882 if_exists: bool,
7883) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7884 match scx.resolve_schema(name) {
7885 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7886 Err(_) if if_exists => Ok(None),
7887 Err(e) => Err(e),
7888 }
7889}
7890
7891pub(crate) fn resolve_network_policy<'a>(
7892 scx: &'a StatementContext,
7893 name: Ident,
7894 if_exists: bool,
7895) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7896 match scx.catalog.resolve_network_policy(&name.to_string()) {
7897 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7898 id: policy.id(),
7899 name: policy.name().to_string(),
7900 })),
7901 Err(_) if if_exists => Ok(None),
7902 Err(e) => Err(e.into()),
7903 }
7904}
7905
7906pub(crate) fn resolve_item_or_type<'a>(
7907 scx: &'a StatementContext,
7908 object_type: ObjectType,
7909 name: UnresolvedItemName,
7910 if_exists: bool,
7911) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7912 let name = normalize::unresolved_item_name(name)?;
7913 let catalog_item = match object_type {
7914 ObjectType::Type => scx.catalog.resolve_type(&name),
7915 _ => scx.catalog.resolve_item(&name),
7916 };
7917
7918 match catalog_item {
7919 Ok(item) => {
7920 let is_type = ObjectType::from(item.item_type());
7921 if object_type == is_type {
7922 Ok(Some(item))
7923 } else {
7924 Err(PlanError::MismatchedObjectType {
7925 name: scx.catalog.minimal_qualification(item.name()),
7926 is_type,
7927 expected_type: object_type,
7928 })
7929 }
7930 }
7931 Err(_) if if_exists => Ok(None),
7932 Err(e) => Err(e.into()),
7933 }
7934}
7935
7936fn ensure_cluster_is_not_managed(
7938 scx: &StatementContext,
7939 cluster_id: ClusterId,
7940) -> Result<(), PlanError> {
7941 let cluster = scx.catalog.get_cluster(cluster_id);
7942 if cluster.is_managed() {
7943 Err(PlanError::ManagedCluster {
7944 cluster_name: cluster.name().to_string(),
7945 })
7946 } else {
7947 Ok(())
7948 }
7949}