1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
49 AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
50 AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
51 AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
52 AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
53 AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
54 ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
55 ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
56 ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
57 ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
58 ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
59 CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
60 CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
61 CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
62 CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
63 CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
64 CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
65 CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
66 CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
67 CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
68 CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
69 CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
70 CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
71 DropOwnedStatement, Expr, Format, FormatSpecifier, IcebergSinkConfigOption, Ident,
72 IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint,
73 LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption,
74 MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, NetworkPolicyOption,
75 NetworkPolicyOptionName, NetworkPolicyRuleDefinition, NetworkPolicyRuleOption,
76 NetworkPolicyRuleOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema,
77 QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue,
78 ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar,
79 SourceErrorPolicy, SourceIncludeMetadata, SqlServerConfigOption, SqlServerConfigOptionName,
80 Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption,
81 TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName,
82 UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
83 WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
152 AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
153 AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
154 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
155 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
156 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
157 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
158 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
159 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
160 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
161 MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
162 PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
163 Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
164 WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
165 transform_ast,
166};
167use crate::session::vars::{
168 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188 if partition_by.len() > desc.len() {
189 tracing::error!(
190 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191 desc.len(),
192 partition_by.len()
193 );
194 partition_by.truncate(desc.len());
195 }
196
197 let desc_prefix = desc.iter().take(partition_by.len());
198 for (idx, ((desc_name, desc_type), partition_name)) in
199 desc_prefix.zip_eq(partition_by).enumerate()
200 {
201 let partition_name = normalize::column_name(partition_name);
202 if *desc_name != partition_name {
203 sql_bail!(
204 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205 );
206 }
207 if !preserves_order(&desc_type.scalar_type) {
208 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209 }
210 }
211 Ok(())
212}
213
214pub fn describe_create_database(
215 _: &StatementContext,
216 _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218 Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222 _: &StatementContext,
223 CreateDatabaseStatement {
224 name,
225 if_not_exists,
226 }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228 Ok(Plan::CreateDatabase(CreateDatabasePlan {
229 name: normalize::ident(name.0),
230 if_not_exists,
231 }))
232}
233
234pub fn describe_create_schema(
235 _: &StatementContext,
236 _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238 Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242 scx: &StatementContext,
243 CreateSchemaStatement {
244 mut name,
245 if_not_exists,
246 }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248 if name.0.len() > 2 {
249 sql_bail!("schema name {} has more than two components", name);
250 }
251 let schema_name = normalize::ident(
252 name.0
253 .pop()
254 .expect("names always have at least one component"),
255 );
256 let database_spec = match name.0.pop() {
257 None => match scx.catalog.active_database() {
258 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259 None => sql_bail!("no database specified and no active database"),
260 },
261 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263 Err(_) => sql_bail!("invalid database {}", n.as_str()),
264 },
265 };
266 Ok(Plan::CreateSchema(CreateSchemaPlan {
267 database_spec,
268 schema_name,
269 if_not_exists,
270 }))
271}
272
273pub fn describe_create_table(
274 _: &StatementContext,
275 _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277 Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281 scx: &StatementContext,
282 stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284 let CreateTableStatement {
285 name,
286 columns,
287 constraints,
288 if_not_exists,
289 temporary,
290 with_options,
291 } = &stmt;
292
293 let names: Vec<_> = columns
294 .iter()
295 .filter(|c| {
296 let is_versioned = c
300 .options
301 .iter()
302 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303 !is_versioned
304 })
305 .map(|c| normalize::column_name(c.name.clone()))
306 .collect();
307
308 if let Some(dup) = names.iter().duplicates().next() {
309 sql_bail!("column {} specified more than once", dup.quoted());
310 }
311
312 let mut column_types = Vec::with_capacity(columns.len());
315 let mut defaults = Vec::with_capacity(columns.len());
316 let mut changes = BTreeMap::new();
317 let mut keys = Vec::new();
318
319 for (i, c) in columns.into_iter().enumerate() {
320 let aug_data_type = &c.data_type;
321 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322 let mut nullable = true;
323 let mut default = Expr::null();
324 let mut versioned = false;
325 for option in &c.options {
326 match &option.option {
327 ColumnOption::NotNull => nullable = false,
328 ColumnOption::Default(expr) => {
329 let mut expr = expr.clone();
332 transform_ast::transform(scx, &mut expr)?;
333 let _ = query::plan_default_expr(scx, &expr, &ty)?;
334 default = expr.clone();
335 }
336 ColumnOption::Unique { is_primary } => {
337 keys.push(vec![i]);
338 if *is_primary {
339 nullable = false;
340 }
341 }
342 ColumnOption::Versioned { action, version } => {
343 let version = RelationVersion::from(*version);
344 versioned = true;
345
346 let name = normalize::column_name(c.name.clone());
347 let typ = ty.clone().nullable(nullable);
348
349 changes.insert(version, (action.clone(), name, typ));
350 }
351 other => {
352 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353 }
354 }
355 }
356 if !versioned {
359 column_types.push(ty.nullable(nullable));
360 }
361 defaults.push(default);
362 }
363
364 let mut seen_primary = false;
365 'c: for constraint in constraints {
366 match constraint {
367 TableConstraint::Unique {
368 name: _,
369 columns,
370 is_primary,
371 nulls_not_distinct,
372 } => {
373 if seen_primary && *is_primary {
374 sql_bail!(
375 "multiple primary keys for table {} are not allowed",
376 name.to_ast_string_stable()
377 );
378 }
379 seen_primary = *is_primary || seen_primary;
380
381 let mut key = vec![];
382 for column in columns {
383 let column = normalize::column_name(column.clone());
384 match names.iter().position(|name| *name == column) {
385 None => sql_bail!("unknown column in constraint: {}", column),
386 Some(i) => {
387 let nullable = &mut column_types[i].nullable;
388 if *is_primary {
389 if *nulls_not_distinct {
390 sql_bail!(
391 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392 );
393 }
394
395 *nullable = false;
396 } else if !(*nulls_not_distinct || !*nullable) {
397 break 'c;
400 }
401
402 key.push(i);
403 }
404 }
405 }
406
407 if *is_primary {
408 keys.insert(0, key);
409 } else {
410 keys.push(key);
411 }
412 }
413 TableConstraint::ForeignKey { .. } => {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417 }
418 TableConstraint::Check { .. } => {
419 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422 }
423 }
424 }
425
426 if !keys.is_empty() {
427 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430 }
431
432 let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434 let temporary = *temporary;
435 let name = if temporary {
436 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437 } else {
438 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 };
440
441 let full_name = scx.catalog.resolve_full_name(&name);
443 let partial_name = PartialItemName::from(full_name.clone());
444 if let (false, Ok(item)) = (
447 if_not_exists,
448 scx.catalog.resolve_item_or_type(&partial_name),
449 ) {
450 return Err(PlanError::ItemAlreadyExists {
451 name: full_name.to_string(),
452 item_type: item.item_type(),
453 });
454 }
455
456 let desc = RelationDesc::new(typ, names);
457 let mut desc = VersionedRelationDesc::new(desc);
458 for (version, (_action, name, typ)) in changes.into_iter() {
459 let new_version = desc.add_column(name, typ);
460 if version != new_version {
461 return Err(PlanError::InvalidTable {
462 name: full_name.item,
463 });
464 }
465 }
466
467 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477 let compaction_window = options.iter().find_map(|o| {
478 #[allow(irrefutable_let_patterns)]
479 if let crate::plan::TableOption::RetainHistory(lcw) = o {
480 Some(lcw.clone())
481 } else {
482 None
483 }
484 });
485
486 let table = Table {
487 create_sql,
488 desc,
489 temporary,
490 compaction_window,
491 data_source: TableDataSource::TableWrites { defaults },
492 };
493 Ok(Plan::CreateTable(CreateTablePlan {
494 name,
495 table,
496 if_not_exists: *if_not_exists,
497 }))
498}
499
500pub fn describe_create_table_from_source(
501 _: &StatementContext,
502 _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504 Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508 _: &StatementContext,
509 _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511 Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515 _: &StatementContext,
516 _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518 Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522 _: &StatementContext,
523 _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525 Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529 CreateSourceOption,
530 (TimestampInterval, Duration),
531 (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535 PgConfigOption,
536 (Details, String),
537 (Publication, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543 MySqlConfigOption,
544 (Details, String),
545 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550 SqlServerConfigOption,
551 (Details, String),
552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557 scx: &StatementContext,
558 mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560 if stmt.is_table {
561 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562 }
563
564 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567 let enable_multi_replica_sources =
568 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569 if !enable_multi_replica_sources {
570 if in_cluster.replica_ids().len() > 1 {
571 sql_bail!("cannot create webhook source in cluster with more than one replica")
572 }
573 }
574 let create_sql =
575 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577 let CreateWebhookSourceStatement {
578 name,
579 if_not_exists,
580 body_format,
581 include_headers,
582 validate_using,
583 is_table,
584 in_cluster: _,
586 } = stmt;
587
588 let validate_using = validate_using
589 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590 .transpose()?;
591 if let Some(WebhookValidation { expression, .. }) = &validate_using {
592 if !expression.contains_column() {
595 return Err(PlanError::WebhookValidationDoesNotUseColumns);
596 }
597 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601 return Err(PlanError::WebhookValidationNonDeterministic);
602 }
603 }
604
605 let body_format = match body_format {
606 Format::Bytes => WebhookBodyFormat::Bytes,
607 Format::Json { array } => WebhookBodyFormat::Json { array },
608 Format::Text => WebhookBodyFormat::Text,
609 ty => {
611 return Err(PlanError::Unsupported {
612 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613 discussion_no: None,
614 });
615 }
616 };
617
618 let mut column_ty = vec![
619 SqlColumnType {
621 scalar_type: SqlScalarType::from(body_format),
622 nullable: false,
623 },
624 ];
625 let mut column_names = vec!["body".to_string()];
626
627 let mut headers = WebhookHeaders::default();
628
629 if let Some(filters) = include_headers.column {
631 column_ty.push(SqlColumnType {
632 scalar_type: SqlScalarType::Map {
633 value_type: Box::new(SqlScalarType::String),
634 custom_id: None,
635 },
636 nullable: false,
637 });
638 column_names.push("headers".to_string());
639
640 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641 filters.into_iter().partition_map(|filter| {
642 if filter.block {
643 itertools::Either::Right(filter.header_name)
644 } else {
645 itertools::Either::Left(filter.header_name)
646 }
647 });
648 headers.header_column = Some(WebhookHeaderFilters { allow, block });
649 }
650
651 for header in include_headers.mappings {
653 let scalar_type = header
654 .use_bytes
655 .then_some(SqlScalarType::Bytes)
656 .unwrap_or(SqlScalarType::String);
657 column_ty.push(SqlColumnType {
658 scalar_type,
659 nullable: true,
660 });
661 column_names.push(header.column_name.into_string());
662
663 let column_idx = column_ty.len() - 1;
664 assert_eq!(
666 column_idx,
667 column_names.len() - 1,
668 "header column names and types don't match"
669 );
670 headers
671 .mapped_headers
672 .insert(column_idx, (header.header_name, header.use_bytes));
673 }
674
675 let mut unique_check = HashSet::with_capacity(column_names.len());
677 for name in &column_names {
678 if !unique_check.insert(name) {
679 return Err(PlanError::AmbiguousColumn(name.clone().into()));
680 }
681 }
682 if column_names.len() > MAX_NUM_COLUMNS {
683 return Err(PlanError::TooManyColumns {
684 max_num_columns: MAX_NUM_COLUMNS,
685 req_num_columns: column_names.len(),
686 });
687 }
688
689 let typ = SqlRelationType::new(column_ty);
690 let desc = RelationDesc::new(typ, column_names);
691
692 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694 let full_name = scx.catalog.resolve_full_name(&name);
695 let partial_name = PartialItemName::from(full_name.clone());
696 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697 return Err(PlanError::ItemAlreadyExists {
698 name: full_name.to_string(),
699 item_type: item.item_type(),
700 });
701 }
702
703 let timeline = Timeline::EpochMilliseconds;
706
707 let plan = if is_table {
708 let data_source = DataSourceDesc::Webhook {
709 validate_using,
710 body_format,
711 headers,
712 cluster_id: Some(in_cluster.id()),
713 };
714 let data_source = TableDataSource::DataSource {
715 desc: data_source,
716 timeline,
717 };
718 Plan::CreateTable(CreateTablePlan {
719 name,
720 if_not_exists,
721 table: Table {
722 create_sql,
723 desc: VersionedRelationDesc::new(desc),
724 temporary: false,
725 compaction_window: None,
726 data_source,
727 },
728 })
729 } else {
730 let data_source = DataSourceDesc::Webhook {
731 validate_using,
732 body_format,
733 headers,
734 cluster_id: None,
736 };
737 Plan::CreateSource(CreateSourcePlan {
738 name,
739 source: Source {
740 create_sql,
741 data_source,
742 desc,
743 compaction_window: None,
744 },
745 if_not_exists,
746 timeline,
747 in_cluster: Some(in_cluster.id()),
748 })
749 };
750
751 Ok(plan)
752}
753
754pub fn plan_create_source(
755 scx: &StatementContext,
756 mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758 let CreateSourceStatement {
759 name,
760 in_cluster: _,
761 col_names,
762 connection: source_connection,
763 envelope,
764 if_not_exists,
765 format,
766 key_constraint,
767 include_metadata,
768 with_options,
769 external_references: referenced_subsources,
770 progress_subsource,
771 } = &stmt;
772
773 mz_ore::soft_assert_or_log!(
774 referenced_subsources.is_none(),
775 "referenced subsources must be cleared in purification"
776 );
777
778 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779 && scx.catalog.system_vars().force_source_table_syntax();
780
781 if force_source_table_syntax {
784 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785 Err(PlanError::UseTablesForSources(
786 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787 ))?;
788 }
789 }
790
791 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794 && include_metadata
795 .iter()
796 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797 {
798 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800 }
801 if !matches!(
802 source_connection,
803 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804 ) && !include_metadata.is_empty()
805 {
806 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807 }
808
809 if !include_metadata.is_empty()
810 && !matches!(
811 envelope,
812 ast::SourceEnvelope::Upsert { .. }
813 | ast::SourceEnvelope::None
814 | ast::SourceEnvelope::Debezium
815 )
816 {
817 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818 }
819
820 let external_connection =
821 plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823 let CreateSourceOptionExtracted {
824 timestamp_interval,
825 retain_history,
826 seen: _,
827 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829 let metadata_columns_desc = match external_connection {
830 GenericSourceConnection::Kafka(KafkaSourceConnection {
831 ref metadata_columns,
832 ..
833 }) => kafka_metadata_columns_desc(metadata_columns),
834 _ => vec![],
835 };
836
837 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839 scx,
840 &envelope,
841 format,
842 Some(external_connection.default_key_desc()),
843 external_connection.default_value_desc(),
844 include_metadata,
845 metadata_columns_desc,
846 &external_connection,
847 )?;
848 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850 let names: Vec<_> = desc.iter_names().cloned().collect();
851 if let Some(dup) = names.iter().duplicates().next() {
852 sql_bail!("column {} specified more than once", dup.quoted());
853 }
854
855 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861 let key_columns = columns
862 .into_iter()
863 .map(normalize::column_name)
864 .collect::<Vec<_>>();
865
866 let mut uniq = BTreeSet::new();
867 for col in key_columns.iter() {
868 if !uniq.insert(col) {
869 sql_bail!("Repeated column name in source key constraint: {}", col);
870 }
871 }
872
873 let key_indices = key_columns
874 .iter()
875 .map(|col| {
876 let name_idx = desc
877 .get_by_name(col)
878 .map(|(idx, _type)| idx)
879 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880 if desc.get_unambiguous_name(name_idx).is_none() {
881 sql_bail!("Ambiguous column in source key constraint: {}", col);
882 }
883 Ok(name_idx)
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886
887 if !desc.typ().keys.is_empty() {
888 return Err(key_constraint_err(&desc, &key_columns));
889 } else {
890 desc = desc.with_key(key_indices);
891 }
892 }
893
894 let timestamp_interval = match timestamp_interval {
895 Some(duration) => {
896 let min = scx.catalog.system_vars().min_timestamp_interval();
897 let max = scx.catalog.system_vars().max_timestamp_interval();
898 if duration < min || duration > max {
899 return Err(PlanError::InvalidTimestampInterval {
900 min,
901 max,
902 requested: duration,
903 });
904 }
905 duration
906 }
907 None => scx.catalog.config().timestamp_interval,
908 };
909
910 let (desc, data_source) = match progress_subsource {
911 Some(name) => {
912 let DeferredItemName::Named(name) = name else {
913 sql_bail!("[internal error] progress subsource must be named during purification");
914 };
915 let ResolvedItemName::Item { id, .. } = name else {
916 sql_bail!("[internal error] invalid target id");
917 };
918
919 let details = match external_connection {
920 GenericSourceConnection::Kafka(ref c) => {
921 SourceExportDetails::Kafka(KafkaSourceExportDetails {
922 metadata_columns: c.metadata_columns.clone(),
923 })
924 }
925 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926 LoadGenerator::Auction
927 | LoadGenerator::Marketing
928 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929 LoadGenerator::Counter { .. }
930 | LoadGenerator::Clock
931 | LoadGenerator::Datums
932 | LoadGenerator::KeyValue(_) => {
933 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934 output: LoadGeneratorOutput::Default,
935 })
936 }
937 },
938 GenericSourceConnection::Postgres(_)
939 | GenericSourceConnection::MySql(_)
940 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941 };
942
943 let data_source = DataSourceDesc::OldSyntaxIngestion {
944 desc: SourceDesc {
945 connection: external_connection,
946 timestamp_interval,
947 },
948 progress_subsource: *id,
949 data_config: SourceExportDataConfig {
950 encoding,
951 envelope: envelope.clone(),
952 },
953 details,
954 };
955 (desc, data_source)
956 }
957 None => {
958 let desc = external_connection.timestamp_desc();
959 let data_source = DataSourceDesc::Ingestion(SourceDesc {
960 connection: external_connection,
961 timestamp_interval,
962 });
963 (desc, data_source)
964 }
965 };
966
967 let if_not_exists = *if_not_exists;
968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970 let full_name = scx.catalog.resolve_full_name(&name);
972 let partial_name = PartialItemName::from(full_name.clone());
973 if let (false, Ok(item)) = (
976 if_not_exists,
977 scx.catalog.resolve_item_or_type(&partial_name),
978 ) {
979 return Err(PlanError::ItemAlreadyExists {
980 name: full_name.to_string(),
981 item_type: item.item_type(),
982 });
983 }
984
985 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992 let timeline = match envelope {
994 SourceEnvelope::CdcV2 => {
995 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996 }
997 _ => Timeline::EpochMilliseconds,
998 };
999
1000 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002 let source = Source {
1003 create_sql,
1004 data_source,
1005 desc,
1006 compaction_window,
1007 };
1008
1009 Ok(Plan::CreateSource(CreateSourcePlan {
1010 name,
1011 source,
1012 if_not_exists,
1013 timeline,
1014 in_cluster: Some(in_cluster.id()),
1015 }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019 scx: &StatementContext<'_>,
1020 source_connection: &CreateSourceConnection<Aug>,
1021 include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023 Ok(match source_connection {
1024 CreateSourceConnection::Kafka {
1025 connection,
1026 options,
1027 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028 scx,
1029 connection,
1030 options,
1031 include_metadata,
1032 )?),
1033 CreateSourceConnection::Postgres {
1034 connection,
1035 options,
1036 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037 scx, connection, options,
1038 )?),
1039 CreateSourceConnection::SqlServer {
1040 connection,
1041 options,
1042 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043 scx, connection, options,
1044 )?),
1045 CreateSourceConnection::MySql {
1046 connection,
1047 options,
1048 } => {
1049 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050 }
1051 CreateSourceConnection::LoadGenerator { generator, options } => {
1052 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053 scx,
1054 generator,
1055 options,
1056 include_metadata,
1057 )?)
1058 }
1059 })
1060}
1061
1062fn plan_load_generator_source_connection(
1063 scx: &StatementContext<'_>,
1064 generator: &ast::LoadGenerator,
1065 options: &Vec<LoadGeneratorOption<Aug>>,
1066 include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068 let load_generator =
1069 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070 let LoadGeneratorOptionExtracted {
1071 tick_interval,
1072 as_of,
1073 up_to,
1074 ..
1075 } = options.clone().try_into()?;
1076 let tick_micros = match tick_interval {
1077 Some(interval) => Some(interval.as_micros().try_into()?),
1078 None => None,
1079 };
1080 if up_to < as_of {
1081 sql_bail!("UP TO cannot be less than AS OF");
1082 }
1083 Ok(LoadGeneratorSourceConnection {
1084 load_generator,
1085 tick_micros,
1086 as_of,
1087 up_to,
1088 })
1089}
1090
1091fn plan_mysql_source_connection(
1092 scx: &StatementContext<'_>,
1093 connection: &ResolvedItemName,
1094 options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096 let connection_item = scx.get_item_by_resolved_name(connection)?;
1097 match connection_item.connection()? {
1098 Connection::MySql(connection) => connection,
1099 _ => sql_bail!(
1100 "{} is not a MySQL connection",
1101 scx.catalog.resolve_full_name(connection_item.name())
1102 ),
1103 };
1104 let MySqlConfigOptionExtracted {
1105 details,
1106 text_columns: _,
1110 exclude_columns: _,
1111 seen: _,
1112 } = options.clone().try_into()?;
1113 let details = details
1114 .as_ref()
1115 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119 Ok(MySqlSourceConnection {
1120 connection: connection_item.id(),
1121 connection_id: connection_item.id(),
1122 details,
1123 })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127 scx: &StatementContext<'_>,
1128 connection: &ResolvedItemName,
1129 options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131 let connection_item = scx.get_item_by_resolved_name(connection)?;
1132 match connection_item.connection()? {
1133 Connection::SqlServer(connection) => connection,
1134 _ => sql_bail!(
1135 "{} is not a SQL Server connection",
1136 scx.catalog.resolve_full_name(connection_item.name())
1137 ),
1138 };
1139 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140 let details = details
1141 .as_ref()
1142 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143 let extras = hex::decode(details)
1144 .map_err(|e| sql_err!("{e}"))
1145 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147 Ok(SqlServerSourceConnection {
1148 connection_id: connection_item.id(),
1149 connection: connection_item.id(),
1150 extras,
1151 })
1152}
1153
1154fn plan_postgres_source_connection(
1155 scx: &StatementContext<'_>,
1156 connection: &ResolvedItemName,
1157 options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159 let connection_item = scx.get_item_by_resolved_name(connection)?;
1160 let PgConfigOptionExtracted {
1161 details,
1162 publication,
1163 text_columns: _,
1167 exclude_columns: _,
1171 seen: _,
1172 } = options.clone().try_into()?;
1173 let details = details
1174 .as_ref()
1175 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177 let details =
1178 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179 let publication_details =
1180 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181 Ok(PostgresSourceConnection {
1182 connection: connection_item.id(),
1183 connection_id: connection_item.id(),
1184 publication: publication.expect("validated exists during purification"),
1185 publication_details,
1186 })
1187}
1188
1189fn plan_kafka_source_connection(
1190 scx: &StatementContext<'_>,
1191 connection_name: &ResolvedItemName,
1192 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193 include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197 sql_bail!(
1198 "{} is not a kafka connection",
1199 scx.catalog.resolve_full_name(connection_item.name())
1200 )
1201 }
1202 let KafkaSourceConfigOptionExtracted {
1203 group_id_prefix,
1204 topic,
1205 topic_metadata_refresh_interval,
1206 start_timestamp: _, start_offset,
1208 seen: _,
1209 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210 let topic = topic.expect("validated exists during purification");
1211 let mut start_offsets = BTreeMap::new();
1212 if let Some(offsets) = start_offset {
1213 for (part, offset) in offsets.iter().enumerate() {
1214 if *offset < 0 {
1215 sql_bail!("START OFFSET must be a nonnegative integer");
1216 }
1217 start_offsets.insert(i32::try_from(part)?, *offset);
1218 }
1219 }
1220 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224 }
1225 let metadata_columns = include_metadata
1226 .into_iter()
1227 .flat_map(|item| match item {
1228 SourceIncludeMetadata::Timestamp { alias } => {
1229 let name = match alias {
1230 Some(name) => name.to_string(),
1231 None => "timestamp".to_owned(),
1232 };
1233 Some((name, KafkaMetadataKind::Timestamp))
1234 }
1235 SourceIncludeMetadata::Partition { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "partition".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Partition))
1241 }
1242 SourceIncludeMetadata::Offset { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "offset".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Offset))
1248 }
1249 SourceIncludeMetadata::Headers { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "headers".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Headers))
1255 }
1256 SourceIncludeMetadata::Header {
1257 alias,
1258 key,
1259 use_bytes,
1260 } => Some((
1261 alias.to_string(),
1262 KafkaMetadataKind::Header {
1263 key: key.clone(),
1264 use_bytes: *use_bytes,
1265 },
1266 )),
1267 SourceIncludeMetadata::Key { .. } => {
1268 None
1270 }
1271 })
1272 .collect();
1273 Ok(KafkaSourceConnection {
1274 connection: connection_item.id(),
1275 connection_id: connection_item.id(),
1276 topic,
1277 start_offsets,
1278 group_id_prefix,
1279 topic_metadata_refresh_interval,
1280 metadata_columns,
1281 })
1282}
1283
1284fn apply_source_envelope_encoding(
1285 scx: &StatementContext,
1286 envelope: &ast::SourceEnvelope,
1287 format: &Option<FormatSpecifier<Aug>>,
1288 key_desc: Option<RelationDesc>,
1289 value_desc: RelationDesc,
1290 include_metadata: &[SourceIncludeMetadata],
1291 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292 source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294 (
1295 RelationDesc,
1296 SourceEnvelope,
1297 Option<SourceDataEncoding<ReferencedConnection>>,
1298 ),
1299 PlanError,
1300> {
1301 let encoding = match format {
1302 Some(format) => Some(get_encoding(scx, format, envelope)?),
1303 None => None,
1304 };
1305
1306 let (key_desc, value_desc) = match &encoding {
1307 Some(encoding) => {
1308 match value_desc.typ().columns() {
1311 [typ] => match typ.scalar_type {
1312 SqlScalarType::Bytes => {}
1313 _ => sql_bail!(
1314 "The schema produced by the source is incompatible with format decoding"
1315 ),
1316 },
1317 _ => sql_bail!(
1318 "The schema produced by the source is incompatible with format decoding"
1319 ),
1320 }
1321
1322 let (key_desc, value_desc) = encoding.desc()?;
1323
1324 let key_desc = key_desc.map(|desc| {
1331 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333 if is_kafka && is_envelope_none {
1334 RelationDesc::from_names_and_types(
1335 desc.into_iter()
1336 .map(|(name, typ)| (name, typ.nullable(true))),
1337 )
1338 } else {
1339 desc
1340 }
1341 });
1342 (key_desc, value_desc)
1343 }
1344 None => (key_desc, value_desc),
1345 };
1346
1347 let key_envelope_no_encoding = matches!(
1360 source_connection,
1361 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362 load_generator: LoadGenerator::KeyValue(_),
1363 ..
1364 })
1365 );
1366 let mut key_envelope = get_key_envelope(
1367 include_metadata,
1368 encoding.as_ref(),
1369 key_envelope_no_encoding,
1370 )?;
1371
1372 match (&envelope, &key_envelope) {
1373 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376 ),
1377 _ => {}
1378 };
1379
1380 let envelope = match &envelope {
1389 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391 ast::SourceEnvelope::Debezium => {
1392 let after_idx = match typecheck_debezium(&value_desc) {
1394 Ok((_before_idx, after_idx)) => Ok(after_idx),
1395 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396 Some(DataEncoding::Avro(_)) => Err(type_err),
1397 _ => Err(sql_err!(
1398 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399 )),
1400 },
1401 }?;
1402
1403 UnplannedSourceEnvelope::Upsert {
1404 style: UpsertStyle::Debezium { after_idx },
1405 }
1406 }
1407 ast::SourceEnvelope::Upsert {
1408 value_decode_err_policy,
1409 } => {
1410 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411 None => {
1412 if !key_envelope_no_encoding {
1413 bail_unsupported!(format!(
1414 "UPSERT requires a key/value format: {:?}",
1415 format
1416 ))
1417 }
1418 None
1419 }
1420 Some(key_encoding) => Some(key_encoding),
1421 };
1422 if key_envelope == KeyEnvelope::None {
1425 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426 }
1427 let style = match value_decode_err_policy.as_slice() {
1429 [] => UpsertStyle::Default(key_envelope),
1430 [SourceErrorPolicy::Inline { alias }] => {
1431 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432 UpsertStyle::ValueErrInline {
1433 key_envelope,
1434 error_column: alias
1435 .as_ref()
1436 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437 }
1438 }
1439 _ => {
1440 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441 }
1442 };
1443
1444 UnplannedSourceEnvelope::Upsert { style }
1445 }
1446 ast::SourceEnvelope::CdcV2 => {
1447 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448 match format {
1450 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452 }
1453 UnplannedSourceEnvelope::CdcV2
1454 }
1455 };
1456
1457 let metadata_desc = included_column_desc(metadata_columns_desc);
1458 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460 Ok((desc, envelope, encoding))
1461}
1462
1463fn plan_source_export_desc(
1466 scx: &StatementContext,
1467 name: &UnresolvedItemName,
1468 columns: &Vec<ColumnDef<Aug>>,
1469 constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471 let names: Vec<_> = columns
1472 .iter()
1473 .map(|c| normalize::column_name(c.name.clone()))
1474 .collect();
1475
1476 if let Some(dup) = names.iter().duplicates().next() {
1477 sql_bail!("column {} specified more than once", dup.quoted());
1478 }
1479
1480 let mut column_types = Vec::with_capacity(columns.len());
1483 let mut keys = Vec::new();
1484
1485 for (i, c) in columns.into_iter().enumerate() {
1486 let aug_data_type = &c.data_type;
1487 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488 let mut nullable = true;
1489 for option in &c.options {
1490 match &option.option {
1491 ColumnOption::NotNull => nullable = false,
1492 ColumnOption::Default(_) => {
1493 bail_unsupported!("Source export with default value")
1494 }
1495 ColumnOption::Unique { is_primary } => {
1496 keys.push(vec![i]);
1497 if *is_primary {
1498 nullable = false;
1499 }
1500 }
1501 other => {
1502 bail_unsupported!(format!("Source export with column constraint: {}", other))
1503 }
1504 }
1505 }
1506 column_types.push(ty.nullable(nullable));
1507 }
1508
1509 let mut seen_primary = false;
1510 'c: for constraint in constraints {
1511 match constraint {
1512 TableConstraint::Unique {
1513 name: _,
1514 columns,
1515 is_primary,
1516 nulls_not_distinct,
1517 } => {
1518 if seen_primary && *is_primary {
1519 sql_bail!(
1520 "multiple primary keys for source export {} are not allowed",
1521 name.to_ast_string_stable()
1522 );
1523 }
1524 seen_primary = *is_primary || seen_primary;
1525
1526 let mut key = vec![];
1527 for column in columns {
1528 let column = normalize::column_name(column.clone());
1529 match names.iter().position(|name| *name == column) {
1530 None => sql_bail!("unknown column in constraint: {}", column),
1531 Some(i) => {
1532 let nullable = &mut column_types[i].nullable;
1533 if *is_primary {
1534 if *nulls_not_distinct {
1535 sql_bail!(
1536 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537 );
1538 }
1539 *nullable = false;
1540 } else if !(*nulls_not_distinct || !*nullable) {
1541 break 'c;
1544 }
1545
1546 key.push(i);
1547 }
1548 }
1549 }
1550
1551 if *is_primary {
1552 keys.insert(0, key);
1553 } else {
1554 keys.push(key);
1555 }
1556 }
1557 TableConstraint::ForeignKey { .. } => {
1558 bail_unsupported!("Source export with a foreign key")
1559 }
1560 TableConstraint::Check { .. } => {
1561 bail_unsupported!("Source export with a check constraint")
1562 }
1563 }
1564 }
1565
1566 let typ = SqlRelationType::new(column_types).with_keys(keys);
1567 let desc = RelationDesc::new(typ, names);
1568 Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572 CreateSubsourceOption,
1573 (Progress, bool, Default(false)),
1574 (ExternalReference, UnresolvedItemName),
1575 (RetainHistory, OptionalDuration),
1576 (TextColumns, Vec::<Ident>, Default(vec![])),
1577 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578 (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582 scx: &StatementContext,
1583 stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585 let CreateSubsourceStatement {
1586 name,
1587 columns,
1588 of_source,
1589 constraints,
1590 if_not_exists,
1591 with_options,
1592 } = &stmt;
1593
1594 let CreateSubsourceOptionExtracted {
1595 progress,
1596 retain_history,
1597 external_reference,
1598 text_columns,
1599 exclude_columns,
1600 details,
1601 seen: _,
1602 } = with_options.clone().try_into()?;
1603
1604 assert!(
1609 progress ^ (external_reference.is_some() && of_source.is_some()),
1610 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611 );
1612
1613 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615 let data_source = if let Some(source_reference) = of_source {
1616 if scx.catalog.system_vars().enable_create_table_from_source()
1619 && scx.catalog.system_vars().force_source_table_syntax()
1620 {
1621 Err(PlanError::UseTablesForSources(
1622 "CREATE SUBSOURCE".to_string(),
1623 ))?;
1624 }
1625
1626 let ingestion_id = *source_reference.item_id();
1629 let external_reference = external_reference.unwrap();
1630
1631 let details = details
1634 .as_ref()
1635 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637 let details =
1638 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639 let details =
1640 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641 let details = match details {
1642 SourceExportStatementDetails::Postgres { table } => {
1643 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644 column_casts: crate::pure::postgres::generate_column_casts(
1645 scx,
1646 &table,
1647 &text_columns,
1648 )?,
1649 table,
1650 })
1651 }
1652 SourceExportStatementDetails::MySql {
1653 table,
1654 initial_gtid_set,
1655 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656 table,
1657 initial_gtid_set,
1658 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659 exclude_columns: exclude_columns
1660 .into_iter()
1661 .map(|c| c.into_string())
1662 .collect(),
1663 }),
1664 SourceExportStatementDetails::SqlServer {
1665 table,
1666 capture_instance,
1667 initial_lsn,
1668 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669 capture_instance,
1670 table,
1671 initial_lsn,
1672 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673 exclude_columns: exclude_columns
1674 .into_iter()
1675 .map(|c| c.into_string())
1676 .collect(),
1677 }),
1678 SourceExportStatementDetails::LoadGenerator { output } => {
1679 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680 }
1681 SourceExportStatementDetails::Kafka {} => {
1682 bail_unsupported!("subsources cannot reference Kafka sources")
1683 }
1684 };
1685 DataSourceDesc::IngestionExport {
1686 ingestion_id,
1687 external_reference,
1688 details,
1689 data_config: SourceExportDataConfig {
1691 envelope: SourceEnvelope::None(NoneEnvelope {
1692 key_envelope: KeyEnvelope::None,
1693 key_arity: 0,
1694 }),
1695 encoding: None,
1696 },
1697 }
1698 } else if progress {
1699 DataSourceDesc::Progress
1700 } else {
1701 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702 };
1703
1704 let if_not_exists = *if_not_exists;
1705 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710 let source = Source {
1711 create_sql,
1712 data_source,
1713 desc,
1714 compaction_window,
1715 };
1716
1717 Ok(Plan::CreateSource(CreateSourcePlan {
1718 name,
1719 source,
1720 if_not_exists,
1721 timeline: Timeline::EpochMilliseconds,
1722 in_cluster: None,
1723 }))
1724}
1725
1726generate_extracted_config!(
1727 TableFromSourceOption,
1728 (TextColumns, Vec::<Ident>, Default(vec![])),
1729 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730 (PartitionBy, Vec<Ident>),
1731 (RetainHistory, OptionalDuration),
1732 (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736 scx: &StatementContext,
1737 stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739 if !scx.catalog.system_vars().enable_create_table_from_source() {
1740 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741 }
1742
1743 let CreateTableFromSourceStatement {
1744 name,
1745 columns,
1746 constraints,
1747 if_not_exists,
1748 source,
1749 external_reference,
1750 envelope,
1751 format,
1752 include_metadata,
1753 with_options,
1754 } = &stmt;
1755
1756 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758 let TableFromSourceOptionExtracted {
1759 text_columns,
1760 exclude_columns,
1761 retain_history,
1762 partition_by,
1763 details,
1764 seen: _,
1765 } = with_options.clone().try_into()?;
1766
1767 let source_item = scx.get_item_by_resolved_name(source)?;
1768 let ingestion_id = source_item.id();
1769
1770 let details = details
1773 .as_ref()
1774 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776 let details =
1777 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778 let details =
1779 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782 && include_metadata
1783 .iter()
1784 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785 {
1786 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788 }
1789 if !matches!(
1790 details,
1791 SourceExportStatementDetails::Kafka { .. }
1792 | SourceExportStatementDetails::LoadGenerator { .. }
1793 ) && !include_metadata.is_empty()
1794 {
1795 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796 }
1797
1798 let details = match details {
1799 SourceExportStatementDetails::Postgres { table } => {
1800 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801 column_casts: crate::pure::postgres::generate_column_casts(
1802 scx,
1803 &table,
1804 &text_columns,
1805 )?,
1806 table,
1807 })
1808 }
1809 SourceExportStatementDetails::MySql {
1810 table,
1811 initial_gtid_set,
1812 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813 table,
1814 initial_gtid_set,
1815 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816 exclude_columns: exclude_columns
1817 .into_iter()
1818 .map(|c| c.into_string())
1819 .collect(),
1820 }),
1821 SourceExportStatementDetails::SqlServer {
1822 table,
1823 capture_instance,
1824 initial_lsn,
1825 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826 table,
1827 capture_instance,
1828 initial_lsn,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 }),
1835 SourceExportStatementDetails::LoadGenerator { output } => {
1836 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837 }
1838 SourceExportStatementDetails::Kafka {} => {
1839 if !include_metadata.is_empty()
1840 && !matches!(
1841 envelope,
1842 ast::SourceEnvelope::Upsert { .. }
1843 | ast::SourceEnvelope::None
1844 | ast::SourceEnvelope::Debezium
1845 )
1846 {
1847 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849 }
1850
1851 let metadata_columns = include_metadata
1852 .into_iter()
1853 .flat_map(|item| match item {
1854 SourceIncludeMetadata::Timestamp { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "timestamp".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Timestamp))
1860 }
1861 SourceIncludeMetadata::Partition { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "partition".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Partition))
1867 }
1868 SourceIncludeMetadata::Offset { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "offset".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Offset))
1874 }
1875 SourceIncludeMetadata::Headers { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "headers".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Headers))
1881 }
1882 SourceIncludeMetadata::Header {
1883 alias,
1884 key,
1885 use_bytes,
1886 } => Some((
1887 alias.to_string(),
1888 KafkaMetadataKind::Header {
1889 key: key.clone(),
1890 use_bytes: *use_bytes,
1891 },
1892 )),
1893 SourceIncludeMetadata::Key { .. } => {
1894 None
1896 }
1897 })
1898 .collect();
1899
1900 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901 }
1902 };
1903
1904 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906 let (key_desc, value_desc) =
1911 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912 let columns = match columns {
1913 TableFromSourceColumns::Defined(columns) => columns,
1914 _ => unreachable!(),
1915 };
1916 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917 (None, desc)
1918 } else {
1919 let key_desc = source_connection.default_key_desc();
1920 let value_desc = source_connection.default_value_desc();
1921 (Some(key_desc), value_desc)
1922 };
1923
1924 let metadata_columns_desc = match &details {
1925 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926 metadata_columns, ..
1927 }) => kafka_metadata_columns_desc(metadata_columns),
1928 _ => vec![],
1929 };
1930
1931 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932 scx,
1933 &envelope,
1934 format,
1935 key_desc,
1936 value_desc,
1937 include_metadata,
1938 metadata_columns_desc,
1939 source_connection,
1940 )?;
1941 if let TableFromSourceColumns::Named(col_names) = columns {
1942 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943 }
1944
1945 let names: Vec<_> = desc.iter_names().cloned().collect();
1946 if let Some(dup) = names.iter().duplicates().next() {
1947 sql_bail!("column {} specified more than once", dup.quoted());
1948 }
1949
1950 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952 let timeline = match envelope {
1955 SourceEnvelope::CdcV2 => {
1956 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957 }
1958 _ => Timeline::EpochMilliseconds,
1959 };
1960
1961 if let Some(partition_by) = partition_by {
1962 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963 check_partition_by(&desc, partition_by)?;
1964 }
1965
1966 let data_source = DataSourceDesc::IngestionExport {
1967 ingestion_id,
1968 external_reference: external_reference
1969 .as_ref()
1970 .expect("populated in purification")
1971 .clone(),
1972 details,
1973 data_config: SourceExportDataConfig { envelope, encoding },
1974 };
1975
1976 let if_not_exists = *if_not_exists;
1977
1978 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981 let table = Table {
1982 create_sql,
1983 desc: VersionedRelationDesc::new(desc),
1984 temporary: false,
1985 compaction_window,
1986 data_source: TableDataSource::DataSource {
1987 desc: data_source,
1988 timeline,
1989 },
1990 };
1991
1992 Ok(Plan::CreateTable(CreateTablePlan {
1993 name,
1994 table,
1995 if_not_exists,
1996 }))
1997}
1998
1999generate_extracted_config!(
2000 LoadGeneratorOption,
2001 (TickInterval, Duration),
2002 (AsOf, u64, Default(0_u64)),
2003 (UpTo, u64, Default(u64::MAX)),
2004 (ScaleFactor, f64),
2005 (MaxCardinality, u64),
2006 (Keys, u64),
2007 (SnapshotRounds, u64),
2008 (TransactionalSnapshot, bool),
2009 (ValueSize, u64),
2010 (Seed, u64),
2011 (Partitions, u64),
2012 (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016 pub(super) fn ensure_only_valid_options(
2017 &self,
2018 loadgen: &ast::LoadGenerator,
2019 ) -> Result<(), PlanError> {
2020 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022 let mut options = self.seen.clone();
2023
2024 let permitted_options: &[_] = match loadgen {
2025 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031 ast::LoadGenerator::KeyValue => &[
2032 TickInterval,
2033 Keys,
2034 SnapshotRounds,
2035 TransactionalSnapshot,
2036 ValueSize,
2037 Seed,
2038 Partitions,
2039 BatchSize,
2040 ],
2041 };
2042
2043 for o in permitted_options {
2044 options.remove(o);
2045 }
2046
2047 if !options.is_empty() {
2048 sql_bail!(
2049 "{} load generators do not support {} values",
2050 loadgen,
2051 options.iter().join(", ")
2052 )
2053 }
2054
2055 Ok(())
2056 }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060 scx: &StatementContext,
2061 loadgen: &ast::LoadGenerator,
2062 options: &[LoadGeneratorOption<Aug>],
2063 include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066 extracted.ensure_only_valid_options(loadgen)?;
2067
2068 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070 }
2071
2072 let load_generator = match loadgen {
2073 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074 ast::LoadGenerator::Clock => {
2075 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076 LoadGenerator::Clock
2077 }
2078 ast::LoadGenerator::Counter => {
2079 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080 let LoadGeneratorOptionExtracted {
2081 max_cardinality, ..
2082 } = extracted;
2083 LoadGenerator::Counter { max_cardinality }
2084 }
2085 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086 ast::LoadGenerator::Datums => {
2087 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088 LoadGenerator::Datums
2089 }
2090 ast::LoadGenerator::Tpch => {
2091 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093 let sf: f64 = scale_factor.unwrap_or(0.01);
2095 if !sf.is_finite() || sf < 0.0 {
2096 sql_bail!("unsupported scale factor {sf}");
2097 }
2098
2099 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100 let total = (sf * multiplier).floor();
2101 let mut i = i64::try_cast_from(total)
2102 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103 if i < 1 {
2104 i = 1;
2105 }
2106 Ok(i)
2107 };
2108
2109 let count_supplier = f_to_i(10_000f64)?;
2112 let count_part = f_to_i(200_000f64)?;
2113 let count_customer = f_to_i(150_000f64)?;
2114 let count_orders = f_to_i(150_000f64 * 10f64)?;
2115 let count_clerk = f_to_i(1_000f64)?;
2116
2117 LoadGenerator::Tpch {
2118 count_supplier,
2119 count_part,
2120 count_customer,
2121 count_orders,
2122 count_clerk,
2123 }
2124 }
2125 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127 let LoadGeneratorOptionExtracted {
2128 keys,
2129 snapshot_rounds,
2130 transactional_snapshot,
2131 value_size,
2132 tick_interval,
2133 seed,
2134 partitions,
2135 batch_size,
2136 ..
2137 } = extracted;
2138
2139 let mut include_offset = None;
2140 for im in include_metadata {
2141 match im {
2142 SourceIncludeMetadata::Offset { alias } => {
2143 include_offset = match alias {
2144 Some(alias) => Some(alias.to_string()),
2145 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146 }
2147 }
2148 SourceIncludeMetadata::Key { .. } => continue,
2149
2150 _ => {
2151 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152 }
2153 };
2154 }
2155
2156 let lgkv = KeyValueLoadGenerator {
2157 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158 snapshot_rounds: snapshot_rounds
2159 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162 value_size: value_size
2163 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164 partitions: partitions
2165 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166 tick_interval,
2167 batch_size: batch_size
2168 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170 include_offset,
2171 };
2172
2173 if lgkv.keys == 0
2174 || lgkv.partitions == 0
2175 || lgkv.value_size == 0
2176 || lgkv.batch_size == 0
2177 {
2178 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179 }
2180
2181 if lgkv.keys % lgkv.partitions != 0 {
2182 sql_bail!("KEYS must be a multiple of PARTITIONS")
2183 }
2184
2185 if lgkv.batch_size > lgkv.keys {
2186 sql_bail!("KEYS must be larger than BATCH SIZE")
2187 }
2188
2189 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193 }
2194
2195 if lgkv.snapshot_rounds == 0 {
2196 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197 }
2198
2199 LoadGenerator::KeyValue(lgkv)
2200 }
2201 };
2202
2203 Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207 let before = value_desc.get_by_name(&"before".into());
2208 let (after_idx, after_ty) = value_desc
2209 .get_by_name(&"after".into())
2210 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211 let before_idx = if let Some((before_idx, before_ty)) = before {
2212 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213 sql_bail!("'before' column must be of type record");
2214 }
2215 if before_ty != after_ty {
2216 sql_bail!("'before' type differs from 'after' column");
2217 }
2218 Some(before_idx)
2219 } else {
2220 None
2221 };
2222 Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226 scx: &StatementContext,
2227 format: &FormatSpecifier<Aug>,
2228 envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230 let encoding = match format {
2231 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232 FormatSpecifier::KeyValue { key, value } => {
2233 let key = {
2234 let encoding = get_encoding_inner(scx, key)?;
2235 Some(encoding.key.unwrap_or(encoding.value))
2236 };
2237 let value = get_encoding_inner(scx, value)?.value;
2238 SourceDataEncoding { key, value }
2239 }
2240 };
2241
2242 let requires_keyvalue = matches!(
2243 envelope,
2244 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245 );
2246 let is_keyvalue = encoding.key.is_some();
2247 if requires_keyvalue && !is_keyvalue {
2248 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249 };
2250
2251 Ok(encoding)
2252}
2253
2254fn source_sink_cluster_config<'a, 'ctx>(
2260 scx: &'a StatementContext<'ctx>,
2261 in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263 let cluster = match in_cluster {
2264 None => {
2265 let cluster = scx.catalog.resolve_cluster(None)?;
2266 *in_cluster = Some(ResolvedClusterName {
2267 id: cluster.id(),
2268 print_name: None,
2269 });
2270 cluster
2271 }
2272 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273 };
2274
2275 Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282 pub key_schema: Option<String>,
2283 pub value_schema: String,
2284 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285 pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289 scx: &StatementContext,
2290 format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292 let value = match format {
2293 Format::Bytes => DataEncoding::Bytes,
2294 Format::Avro(schema) => {
2295 let Schema {
2296 key_schema,
2297 value_schema,
2298 csr_connection,
2299 confluent_wire_format,
2300 } = match schema {
2301 AvroSchema::InlineSchema {
2304 schema: ast::Schema { schema },
2305 with_options,
2306 } => {
2307 let AvroSchemaOptionExtracted {
2308 confluent_wire_format,
2309 ..
2310 } = with_options.clone().try_into()?;
2311
2312 Schema {
2313 key_schema: None,
2314 value_schema: schema.clone(),
2315 csr_connection: None,
2316 confluent_wire_format,
2317 }
2318 }
2319 AvroSchema::Csr {
2320 csr_connection:
2321 CsrConnectionAvro {
2322 connection,
2323 seed,
2324 key_strategy: _,
2325 value_strategy: _,
2326 },
2327 } => {
2328 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329 let csr_connection = match item.connection()? {
2330 Connection::Csr(_) => item.id(),
2331 _ => {
2332 sql_bail!(
2333 "{} is not a schema registry connection",
2334 scx.catalog
2335 .resolve_full_name(item.name())
2336 .to_string()
2337 .quoted()
2338 )
2339 }
2340 };
2341
2342 if let Some(seed) = seed {
2343 Schema {
2344 key_schema: seed.key_schema.clone(),
2345 value_schema: seed.value_schema.clone(),
2346 csr_connection: Some(csr_connection),
2347 confluent_wire_format: true,
2348 }
2349 } else {
2350 unreachable!("CSR seed resolution should already have been called: Avro")
2351 }
2352 }
2353 };
2354
2355 if let Some(key_schema) = key_schema {
2356 return Ok(SourceDataEncoding {
2357 key: Some(DataEncoding::Avro(AvroEncoding {
2358 schema: key_schema,
2359 csr_connection: csr_connection.clone(),
2360 confluent_wire_format,
2361 })),
2362 value: DataEncoding::Avro(AvroEncoding {
2363 schema: value_schema,
2364 csr_connection,
2365 confluent_wire_format,
2366 }),
2367 });
2368 } else {
2369 DataEncoding::Avro(AvroEncoding {
2370 schema: value_schema,
2371 csr_connection,
2372 confluent_wire_format,
2373 })
2374 }
2375 }
2376 Format::Protobuf(schema) => match schema {
2377 ProtobufSchema::Csr {
2378 csr_connection:
2379 CsrConnectionProtobuf {
2380 connection:
2381 CsrConnection {
2382 connection,
2383 options,
2384 },
2385 seed,
2386 },
2387 } => {
2388 if let Some(CsrSeedProtobuf { key, value }) = seed {
2389 let item = scx.get_item_by_resolved_name(connection)?;
2390 let _ = match item.connection()? {
2391 Connection::Csr(connection) => connection,
2392 _ => {
2393 sql_bail!(
2394 "{} is not a schema registry connection",
2395 scx.catalog
2396 .resolve_full_name(item.name())
2397 .to_string()
2398 .quoted()
2399 )
2400 }
2401 };
2402
2403 if !options.is_empty() {
2404 sql_bail!("Protobuf CSR connections do not support any options");
2405 }
2406
2407 let value = DataEncoding::Protobuf(ProtobufEncoding {
2408 descriptors: strconv::parse_bytes(&value.schema)?,
2409 message_name: value.message_name.clone(),
2410 confluent_wire_format: true,
2411 });
2412 if let Some(key) = key {
2413 return Ok(SourceDataEncoding {
2414 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415 descriptors: strconv::parse_bytes(&key.schema)?,
2416 message_name: key.message_name.clone(),
2417 confluent_wire_format: true,
2418 })),
2419 value,
2420 });
2421 }
2422 value
2423 } else {
2424 unreachable!("CSR seed resolution should already have been called: Proto")
2425 }
2426 }
2427 ProtobufSchema::InlineSchema {
2428 message_name,
2429 schema: ast::Schema { schema },
2430 } => {
2431 let descriptors = strconv::parse_bytes(schema)?;
2432
2433 DataEncoding::Protobuf(ProtobufEncoding {
2434 descriptors,
2435 message_name: message_name.to_owned(),
2436 confluent_wire_format: false,
2437 })
2438 }
2439 },
2440 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441 regex: mz_repr::adt::regex::Regex::new(regex, false)
2442 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443 }),
2444 Format::Csv { columns, delimiter } => {
2445 let columns = match columns {
2446 CsvColumns::Header { names } => {
2447 if names.is_empty() {
2448 sql_bail!("[internal error] column spec should get names in purify")
2449 }
2450 ColumnSpec::Header {
2451 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452 }
2453 }
2454 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455 };
2456 DataEncoding::Csv(CsvEncoding {
2457 columns,
2458 delimiter: u8::try_from(*delimiter)
2459 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460 })
2461 }
2462 Format::Json { array: false } => DataEncoding::Json,
2463 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464 Format::Text => DataEncoding::Text,
2465 };
2466 Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469fn get_key_envelope(
2471 included_items: &[SourceIncludeMetadata],
2472 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473 key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475 let key_definition = included_items
2476 .iter()
2477 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482 (Some(name), _) if key_envelope_no_encoding => {
2483 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484 }
2485 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486 (_, None) => {
2487 sql_bail!(
2491 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492 got bare FORMAT"
2493 );
2494 }
2495 }
2496 } else {
2497 Ok(KeyEnvelope::None)
2498 }
2499}
2500
2501fn get_unnamed_key_envelope(
2504 key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506 let is_composite = match key {
2510 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511 Some(
2512 DataEncoding::Avro(_)
2513 | DataEncoding::Csv(_)
2514 | DataEncoding::Protobuf(_)
2515 | DataEncoding::Regex { .. },
2516 ) => true,
2517 None => false,
2518 };
2519
2520 if is_composite {
2521 Ok(KeyEnvelope::Flattened)
2522 } else {
2523 Ok(KeyEnvelope::Named("key".to_string()))
2524 }
2525}
2526
2527pub fn describe_create_view(
2528 _: &StatementContext,
2529 _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531 Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535 scx: &StatementContext,
2536 def: &mut ViewDefinition<Aug>,
2537 temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539 let create_sql = normalize::create_statement(
2540 scx,
2541 Statement::CreateView(CreateViewStatement {
2542 if_exists: IfExistsBehavior::Error,
2543 temporary,
2544 definition: def.clone(),
2545 }),
2546 )?;
2547
2548 let ViewDefinition {
2549 name,
2550 columns,
2551 query,
2552 } = def;
2553
2554 let query::PlannedRootQuery {
2555 expr,
2556 mut desc,
2557 finishing,
2558 scope: _,
2559 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566 &finishing,
2567 expr.arity()
2568 ));
2569 if expr.contains_parameters()? {
2570 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571 }
2572
2573 let dependencies = expr
2574 .depends_on()
2575 .into_iter()
2576 .map(|gid| scx.catalog.resolve_item_id(&gid))
2577 .collect();
2578
2579 let name = if temporary {
2580 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581 } else {
2582 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583 };
2584
2585 plan_utils::maybe_rename_columns(
2586 format!("view {}", scx.catalog.resolve_full_name(&name)),
2587 &mut desc,
2588 columns,
2589 )?;
2590 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592 if let Some(dup) = names.iter().duplicates().next() {
2593 sql_bail!("column {} specified more than once", dup.quoted());
2594 }
2595
2596 let view = View {
2597 create_sql,
2598 expr,
2599 dependencies,
2600 column_names: names,
2601 temporary,
2602 };
2603
2604 Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608 scx: &StatementContext,
2609 mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611 let CreateViewStatement {
2612 temporary,
2613 if_exists,
2614 definition,
2615 } = &mut stmt;
2616 let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623 let if_exists = true;
2624 let cascade = false;
2625 let maybe_item_to_drop = plan_drop_item(
2626 scx,
2627 ObjectType::View,
2628 if_exists,
2629 definition.name.clone(),
2630 cascade,
2631 )?;
2632
2633 if let Some(id) = maybe_item_to_drop {
2635 let dependencies = view.expr.depends_on();
2636 let invalid_drop = scx
2637 .get_item(&id)
2638 .global_ids()
2639 .any(|gid| dependencies.contains(&gid));
2640 if invalid_drop {
2641 let item = scx.catalog.get_item(&id);
2642 sql_bail!(
2643 "cannot replace view {0}: depended upon by new {0} definition",
2644 scx.catalog.resolve_full_name(item.name())
2645 );
2646 }
2647
2648 Some(id)
2649 } else {
2650 None
2651 }
2652 } else {
2653 None
2654 };
2655 let drop_ids = replace
2656 .map(|id| {
2657 scx.catalog
2658 .item_dependents(id)
2659 .into_iter()
2660 .map(|id| id.unwrap_item_id())
2661 .collect()
2662 })
2663 .unwrap_or_default();
2664
2665 let full_name = scx.catalog.resolve_full_name(&name);
2667 let partial_name = PartialItemName::from(full_name.clone());
2668 if let (Ok(item), IfExistsBehavior::Error, false) = (
2671 scx.catalog.resolve_item_or_type(&partial_name),
2672 *if_exists,
2673 ignore_if_exists_errors,
2674 ) {
2675 return Err(PlanError::ItemAlreadyExists {
2676 name: full_name.to_string(),
2677 item_type: item.item_type(),
2678 });
2679 }
2680
2681 Ok(Plan::CreateView(CreateViewPlan {
2682 name,
2683 view,
2684 replace,
2685 drop_ids,
2686 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2687 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2688 }))
2689}
2690
2691pub fn describe_create_materialized_view(
2692 _: &StatementContext,
2693 _: CreateMaterializedViewStatement<Aug>,
2694) -> Result<StatementDesc, PlanError> {
2695 Ok(StatementDesc::new(None))
2696}
2697
2698pub fn describe_create_continual_task(
2699 _: &StatementContext,
2700 _: CreateContinualTaskStatement<Aug>,
2701) -> Result<StatementDesc, PlanError> {
2702 Ok(StatementDesc::new(None))
2703}
2704
2705pub fn describe_create_network_policy(
2706 _: &StatementContext,
2707 _: CreateNetworkPolicyStatement<Aug>,
2708) -> Result<StatementDesc, PlanError> {
2709 Ok(StatementDesc::new(None))
2710}
2711
2712pub fn describe_alter_network_policy(
2713 _: &StatementContext,
2714 _: AlterNetworkPolicyStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716 Ok(StatementDesc::new(None))
2717}
2718
2719pub fn plan_create_materialized_view(
2720 scx: &StatementContext,
2721 mut stmt: CreateMaterializedViewStatement<Aug>,
2722) -> Result<Plan, PlanError> {
2723 let cluster_id =
2724 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2725 stmt.in_cluster = Some(ResolvedClusterName {
2726 id: cluster_id,
2727 print_name: None,
2728 });
2729
2730 let create_sql =
2731 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2732
2733 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2734 let name = scx.allocate_qualified_name(partial_name.clone())?;
2735
2736 let query::PlannedRootQuery {
2737 expr,
2738 mut desc,
2739 finishing,
2740 scope: _,
2741 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2742 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2744 &finishing,
2745 expr.arity()
2746 ));
2747 if expr.contains_parameters()? {
2748 return Err(PlanError::ParameterNotAllowed(
2749 "materialized views".to_string(),
2750 ));
2751 }
2752
2753 plan_utils::maybe_rename_columns(
2754 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2755 &mut desc,
2756 &stmt.columns,
2757 )?;
2758 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2759
2760 let MaterializedViewOptionExtracted {
2761 assert_not_null,
2762 partition_by,
2763 retain_history,
2764 refresh,
2765 seen: _,
2766 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2767
2768 if let Some(partition_by) = partition_by {
2769 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2770 check_partition_by(&desc, partition_by)?;
2771 }
2772
2773 let refresh_schedule = {
2774 let mut refresh_schedule = RefreshSchedule::default();
2775 let mut on_commits_seen = 0;
2776 for refresh_option_value in refresh {
2777 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2778 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2779 }
2780 match refresh_option_value {
2781 RefreshOptionValue::OnCommit => {
2782 on_commits_seen += 1;
2783 }
2784 RefreshOptionValue::AtCreation => {
2785 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2786 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2787 }
2788 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2789 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2791 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2792 name: "REFRESH AT",
2793 scope: &Scope::empty(),
2794 relation_type: &SqlRelationType::empty(),
2795 allow_aggregates: false,
2796 allow_subqueries: false,
2797 allow_parameters: false,
2798 allow_windows: false,
2799 };
2800 let hir = plan_expr(ecx, &time)?.cast_to(
2801 ecx,
2802 CastContext::Assignment,
2803 &SqlScalarType::MzTimestamp,
2804 )?;
2805 let timestamp = hir
2807 .into_literal_mz_timestamp()
2808 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2809 refresh_schedule.ats.push(timestamp);
2810 }
2811 RefreshOptionValue::Every(RefreshEveryOptionValue {
2812 interval,
2813 aligned_to,
2814 }) => {
2815 let interval = Interval::try_from_value(Value::Interval(interval))?;
2816 if interval.as_microseconds() <= 0 {
2817 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2818 }
2819 if interval.months != 0 {
2820 sql_bail!("REFRESH interval must not involve units larger than days");
2825 }
2826 let interval = interval.duration()?;
2827 if u64::try_from(interval.as_millis()).is_err() {
2828 sql_bail!("REFRESH interval too large");
2829 }
2830
2831 let mut aligned_to = match aligned_to {
2832 Some(aligned_to) => aligned_to,
2833 None => {
2834 soft_panic_or_log!(
2835 "ALIGNED TO should have been filled in by purification"
2836 );
2837 sql_bail!(
2838 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2839 )
2840 }
2841 };
2842
2843 transform_ast::transform(scx, &mut aligned_to)?;
2845
2846 let ecx = &ExprContext {
2847 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2848 name: "REFRESH EVERY ... ALIGNED TO",
2849 scope: &Scope::empty(),
2850 relation_type: &SqlRelationType::empty(),
2851 allow_aggregates: false,
2852 allow_subqueries: false,
2853 allow_parameters: false,
2854 allow_windows: false,
2855 };
2856 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2857 ecx,
2858 CastContext::Assignment,
2859 &SqlScalarType::MzTimestamp,
2860 )?;
2861 let aligned_to_const = aligned_to_hir
2863 .into_literal_mz_timestamp()
2864 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2865
2866 refresh_schedule.everies.push(RefreshEvery {
2867 interval,
2868 aligned_to: aligned_to_const,
2869 });
2870 }
2871 }
2872 }
2873
2874 if on_commits_seen > 1 {
2875 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2876 }
2877 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2878 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2879 }
2880
2881 if refresh_schedule == RefreshSchedule::default() {
2882 None
2883 } else {
2884 Some(refresh_schedule)
2885 }
2886 };
2887
2888 let as_of = stmt.as_of.map(Timestamp::from);
2889 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2890 let mut non_null_assertions = assert_not_null
2891 .into_iter()
2892 .map(normalize::column_name)
2893 .map(|assertion_name| {
2894 column_names
2895 .iter()
2896 .position(|col| col == &assertion_name)
2897 .ok_or_else(|| {
2898 sql_err!(
2899 "column {} in ASSERT NOT NULL option not found",
2900 assertion_name.quoted()
2901 )
2902 })
2903 })
2904 .collect::<Result<Vec<_>, _>>()?;
2905 non_null_assertions.sort();
2906 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2907 let dup = &column_names[*dup];
2908 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2909 }
2910
2911 if let Some(dup) = column_names.iter().duplicates().next() {
2912 sql_bail!("column {} specified more than once", dup.quoted());
2913 }
2914
2915 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2918 Ok(true) => IfExistsBehavior::Skip,
2919 _ => stmt.if_exists,
2920 };
2921
2922 let mut replace = None;
2923 let mut if_not_exists = false;
2924 match if_exists {
2925 IfExistsBehavior::Replace => {
2926 let if_exists = true;
2927 let cascade = false;
2928 let replace_id = plan_drop_item(
2929 scx,
2930 ObjectType::MaterializedView,
2931 if_exists,
2932 partial_name.into(),
2933 cascade,
2934 )?;
2935
2936 if let Some(id) = replace_id {
2938 let dependencies = expr.depends_on();
2939 let invalid_drop = scx
2940 .get_item(&id)
2941 .global_ids()
2942 .any(|gid| dependencies.contains(&gid));
2943 if invalid_drop {
2944 let item = scx.catalog.get_item(&id);
2945 sql_bail!(
2946 "cannot replace materialized view {0}: depended upon by new {0} definition",
2947 scx.catalog.resolve_full_name(item.name())
2948 );
2949 }
2950 replace = Some(id);
2951 }
2952 }
2953 IfExistsBehavior::Skip => if_not_exists = true,
2954 IfExistsBehavior::Error => (),
2955 }
2956 let drop_ids = replace
2957 .map(|id| {
2958 scx.catalog
2959 .item_dependents(id)
2960 .into_iter()
2961 .map(|id| id.unwrap_item_id())
2962 .collect()
2963 })
2964 .unwrap_or_default();
2965 let dependencies = expr
2966 .depends_on()
2967 .into_iter()
2968 .map(|gid| scx.catalog.resolve_item_id(&gid))
2969 .collect();
2970
2971 let full_name = scx.catalog.resolve_full_name(&name);
2973 let partial_name = PartialItemName::from(full_name.clone());
2974 if let (IfExistsBehavior::Error, Ok(item)) =
2977 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2978 {
2979 return Err(PlanError::ItemAlreadyExists {
2980 name: full_name.to_string(),
2981 item_type: item.item_type(),
2982 });
2983 }
2984
2985 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2986 name,
2987 materialized_view: MaterializedView {
2988 create_sql,
2989 expr,
2990 dependencies,
2991 column_names,
2992 cluster_id,
2993 non_null_assertions,
2994 compaction_window,
2995 refresh_schedule,
2996 as_of,
2997 },
2998 replace,
2999 drop_ids,
3000 if_not_exists,
3001 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3002 }))
3003}
3004
3005generate_extracted_config!(
3006 MaterializedViewOption,
3007 (AssertNotNull, Ident, AllowMultiple),
3008 (PartitionBy, Vec<Ident>),
3009 (RetainHistory, OptionalDuration),
3010 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3011);
3012
3013pub fn plan_create_continual_task(
3014 scx: &StatementContext,
3015 mut stmt: CreateContinualTaskStatement<Aug>,
3016) -> Result<Plan, PlanError> {
3017 match &stmt.sugar {
3018 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3019 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3020 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3021 }
3022 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3023 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3024 }
3025 };
3026 let cluster_id = match &stmt.in_cluster {
3027 None => scx.catalog.resolve_cluster(None)?.id(),
3028 Some(in_cluster) => in_cluster.id,
3029 };
3030 stmt.in_cluster = Some(ResolvedClusterName {
3031 id: cluster_id,
3032 print_name: None,
3033 });
3034
3035 let create_sql =
3036 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3037
3038 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3039
3040 let mut desc = match stmt.columns {
3045 None => None,
3046 Some(columns) => {
3047 let mut desc_columns = Vec::with_capacity(columns.capacity());
3048 for col in columns.iter() {
3049 desc_columns.push((
3050 normalize::column_name(col.name.clone()),
3051 SqlColumnType {
3052 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3053 nullable: false,
3054 },
3055 ));
3056 }
3057 Some(RelationDesc::from_names_and_types(desc_columns))
3058 }
3059 };
3060 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3061 match input.item_type() {
3062 CatalogItemType::ContinualTask
3065 | CatalogItemType::Table
3066 | CatalogItemType::MaterializedView
3067 | CatalogItemType::Source => {}
3068 CatalogItemType::Sink
3069 | CatalogItemType::View
3070 | CatalogItemType::Index
3071 | CatalogItemType::Type
3072 | CatalogItemType::Func
3073 | CatalogItemType::Secret
3074 | CatalogItemType::Connection => {
3075 sql_bail!(
3076 "CONTINUAL TASK cannot use {} as an input",
3077 input.item_type()
3078 );
3079 }
3080 }
3081
3082 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3083 let ct_name = stmt.name;
3084 let placeholder_id = match &ct_name {
3085 ResolvedItemName::ContinualTask { id, name } => {
3086 let desc = match desc.as_ref().cloned() {
3087 Some(x) => x,
3088 None => {
3089 let input_name = scx.catalog.resolve_full_name(input.name());
3094 input.desc(&input_name)?.into_owned()
3095 }
3096 };
3097 qcx.ctes.insert(
3098 *id,
3099 CteDesc {
3100 name: name.item.clone(),
3101 desc,
3102 },
3103 );
3104 Some(*id)
3105 }
3106 _ => None,
3107 };
3108
3109 let mut exprs = Vec::new();
3110 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3111 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3112 let query::PlannedRootQuery {
3113 expr,
3114 desc: desc_query,
3115 finishing,
3116 scope: _,
3117 } = query::plan_ct_query(&mut qcx, query)?;
3118 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3121 &finishing,
3122 expr.arity()
3123 ));
3124 if expr.contains_parameters()? {
3125 if expr.contains_parameters()? {
3126 return Err(PlanError::ParameterNotAllowed(
3127 "continual tasks".to_string(),
3128 ));
3129 }
3130 }
3131 let expr = match desc.as_mut() {
3132 None => {
3133 desc = Some(desc_query);
3134 expr
3135 }
3136 Some(desc) => {
3137 if desc_query.arity() > desc.arity() {
3140 sql_bail!(
3141 "statement {}: INSERT has more expressions than target columns",
3142 idx
3143 );
3144 }
3145 if desc_query.arity() < desc.arity() {
3146 sql_bail!(
3147 "statement {}: INSERT has more target columns than expressions",
3148 idx
3149 );
3150 }
3151 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3154 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3155 let expr = expr.map_err(|e| {
3156 sql_err!(
3157 "statement {}: column {} is of type {} but expression is of type {}",
3158 idx,
3159 desc.get_name(e.column).quoted(),
3160 qcx.humanize_scalar_type(&e.target_type, false),
3161 qcx.humanize_scalar_type(&e.source_type, false),
3162 )
3163 })?;
3164
3165 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3168 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3169 if updated {
3170 let new_types = zip_types().map(|(ct, q)| {
3171 let mut ct = ct.clone();
3172 if q.nullable {
3173 ct.nullable = true;
3174 }
3175 ct
3176 });
3177 *desc = RelationDesc::from_names_and_types(
3178 desc.iter_names().cloned().zip_eq(new_types),
3179 );
3180 }
3181
3182 expr
3183 }
3184 };
3185 match stmt {
3186 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3187 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3188 }
3189 }
3190 let expr = exprs
3193 .into_iter()
3194 .reduce(|acc, expr| acc.union(expr))
3195 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3196 let dependencies = expr
3197 .depends_on()
3198 .into_iter()
3199 .map(|gid| scx.catalog.resolve_item_id(&gid))
3200 .collect();
3201
3202 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3203 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3204 if let Some(dup) = column_names.iter().duplicates().next() {
3205 sql_bail!("column {} specified more than once", dup.quoted());
3206 }
3207
3208 let name = match &ct_name {
3210 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3211 ResolvedItemName::ContinualTask { name, .. } => {
3212 let name = scx.allocate_qualified_name(name.clone())?;
3213 let full_name = scx.catalog.resolve_full_name(&name);
3214 let partial_name = PartialItemName::from(full_name.clone());
3215 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3218 return Err(PlanError::ItemAlreadyExists {
3219 name: full_name.to_string(),
3220 item_type: item.item_type(),
3221 });
3222 }
3223 name
3224 }
3225 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3226 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3227 };
3228
3229 let as_of = stmt.as_of.map(Timestamp::from);
3230 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3231 name,
3232 placeholder_id,
3233 desc,
3234 input_id: input.global_id(),
3235 with_snapshot: snapshot.unwrap_or(true),
3236 continual_task: MaterializedView {
3237 create_sql,
3238 expr,
3239 dependencies,
3240 column_names,
3241 cluster_id,
3242 non_null_assertions: Vec::new(),
3243 compaction_window: None,
3244 refresh_schedule: None,
3245 as_of,
3246 },
3247 }))
3248}
3249
3250fn continual_task_query<'a>(
3251 ct_name: &ResolvedItemName,
3252 stmt: &'a ast::ContinualTaskStmt<Aug>,
3253) -> Option<ast::Query<Aug>> {
3254 match stmt {
3255 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3256 table_name: _,
3257 columns,
3258 source,
3259 returning,
3260 }) => {
3261 if !columns.is_empty() || !returning.is_empty() {
3262 return None;
3263 }
3264 match source {
3265 ast::InsertSource::Query(query) => Some(query.clone()),
3266 ast::InsertSource::DefaultValues => None,
3267 }
3268 }
3269 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3270 table_name: _,
3271 alias,
3272 using,
3273 selection,
3274 }) => {
3275 if !using.is_empty() {
3276 return None;
3277 }
3278 let from = ast::TableWithJoins {
3280 relation: ast::TableFactor::Table {
3281 name: ct_name.clone(),
3282 alias: alias.clone(),
3283 },
3284 joins: Vec::new(),
3285 };
3286 let select = ast::Select {
3287 from: vec![from],
3288 selection: selection.clone(),
3289 distinct: None,
3290 projection: vec![ast::SelectItem::Wildcard],
3291 group_by: Vec::new(),
3292 having: None,
3293 qualify: None,
3294 options: Vec::new(),
3295 };
3296 let query = ast::Query {
3297 ctes: ast::CteBlock::Simple(Vec::new()),
3298 body: ast::SetExpr::Select(Box::new(select)),
3299 order_by: Vec::new(),
3300 limit: None,
3301 offset: None,
3302 };
3303 Some(query)
3305 }
3306 }
3307}
3308
3309generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3310
3311pub fn describe_create_sink(
3312 _: &StatementContext,
3313 _: CreateSinkStatement<Aug>,
3314) -> Result<StatementDesc, PlanError> {
3315 Ok(StatementDesc::new(None))
3316}
3317
3318generate_extracted_config!(
3319 CreateSinkOption,
3320 (Snapshot, bool),
3321 (PartitionStrategy, String),
3322 (Version, u64),
3323 (CommitInterval, Duration)
3324);
3325
3326pub fn plan_create_sink(
3327 scx: &StatementContext,
3328 stmt: CreateSinkStatement<Aug>,
3329) -> Result<Plan, PlanError> {
3330 let Some(name) = stmt.name.clone() else {
3332 return Err(PlanError::MissingName(CatalogItemType::Sink));
3333 };
3334 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3335 let full_name = scx.catalog.resolve_full_name(&name);
3336 let partial_name = PartialItemName::from(full_name.clone());
3337 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3338 return Err(PlanError::ItemAlreadyExists {
3339 name: full_name.to_string(),
3340 item_type: item.item_type(),
3341 });
3342 }
3343
3344 plan_sink(scx, stmt)
3345}
3346
3347fn plan_sink(
3352 scx: &StatementContext,
3353 mut stmt: CreateSinkStatement<Aug>,
3354) -> Result<Plan, PlanError> {
3355 let CreateSinkStatement {
3356 name,
3357 in_cluster: _,
3358 from,
3359 connection,
3360 format,
3361 envelope,
3362 if_not_exists,
3363 with_options,
3364 } = stmt.clone();
3365
3366 let Some(name) = name else {
3367 return Err(PlanError::MissingName(CatalogItemType::Sink));
3368 };
3369 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3370
3371 let envelope = match envelope {
3372 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3373 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3374 None => sql_bail!("ENVELOPE clause is required"),
3375 };
3376
3377 let from_name = &from;
3378 let from = scx.get_item_by_resolved_name(&from)?;
3379 if from.id().is_system() {
3380 bail_unsupported!("creating a sink directly on a catalog object");
3381 }
3382 let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3383 let key_indices = match &connection {
3384 CreateSinkConnection::Kafka { key: Some(key), .. }
3385 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3386 let key_columns = key
3387 .key_columns
3388 .clone()
3389 .into_iter()
3390 .map(normalize::column_name)
3391 .collect::<Vec<_>>();
3392 let mut uniq = BTreeSet::new();
3393 for col in key_columns.iter() {
3394 if !uniq.insert(col) {
3395 sql_bail!("duplicate column referenced in KEY: {}", col);
3396 }
3397 }
3398 let indices = key_columns
3399 .iter()
3400 .map(|col| -> anyhow::Result<usize> {
3401 let name_idx =
3402 desc.get_by_name(col)
3403 .map(|(idx, _type)| idx)
3404 .ok_or_else(|| {
3405 sql_err!("column referenced in KEY does not exist: {}", col)
3406 })?;
3407 if desc.get_unambiguous_name(name_idx).is_none() {
3408 sql_err!("column referenced in KEY is ambiguous: {}", col);
3409 }
3410 Ok(name_idx)
3411 })
3412 .collect::<Result<Vec<_>, _>>()?;
3413 let is_valid_key = desc
3414 .typ()
3415 .keys
3416 .iter()
3417 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3418
3419 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3420 if key.not_enforced {
3421 scx.catalog
3422 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3423 key: key_columns.clone(),
3424 name: name.item.clone(),
3425 })
3426 } else {
3427 return Err(PlanError::UpsertSinkWithInvalidKey {
3428 name: from_name.full_name_str(),
3429 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3430 valid_keys: desc
3431 .typ()
3432 .keys
3433 .iter()
3434 .map(|key| {
3435 key.iter()
3436 .map(|col| desc.get_name(*col).as_str().into())
3437 .collect()
3438 })
3439 .collect(),
3440 });
3441 }
3442 }
3443 Some(indices)
3444 }
3445 CreateSinkConnection::Kafka { key: None, .. }
3446 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3447 };
3448
3449 let headers_index = match &connection {
3450 CreateSinkConnection::Kafka {
3451 headers: Some(headers),
3452 ..
3453 } => {
3454 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3455
3456 match envelope {
3457 SinkEnvelope::Upsert => (),
3458 SinkEnvelope::Debezium => {
3459 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3460 }
3461 };
3462
3463 let headers = normalize::column_name(headers.clone());
3464 let (idx, ty) = desc
3465 .get_by_name(&headers)
3466 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3467
3468 if desc.get_unambiguous_name(idx).is_none() {
3469 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3470 }
3471
3472 match &ty.scalar_type {
3473 SqlScalarType::Map { value_type, .. }
3474 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3475 _ => sql_bail!(
3476 "HEADERS column must have type map[text => text] or map[text => bytea]"
3477 ),
3478 }
3479
3480 Some(idx)
3481 }
3482 _ => None,
3483 };
3484
3485 let relation_key_indices = desc.typ().keys.get(0).cloned();
3487
3488 let key_desc_and_indices = key_indices.map(|key_indices| {
3489 let cols = desc
3490 .iter()
3491 .map(|(name, ty)| (name.clone(), ty.clone()))
3492 .collect::<Vec<_>>();
3493 let (names, types): (Vec<_>, Vec<_>) =
3494 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3495 let typ = SqlRelationType::new(types);
3496 (RelationDesc::new(typ, names), key_indices)
3497 });
3498
3499 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3500 return Err(PlanError::UpsertSinkWithoutKey);
3501 }
3502
3503 let connection_builder = match connection {
3504 CreateSinkConnection::Kafka {
3505 connection,
3506 options,
3507 ..
3508 } => kafka_sink_builder(
3509 scx,
3510 connection,
3511 options,
3512 format,
3513 relation_key_indices,
3514 key_desc_and_indices,
3515 headers_index,
3516 desc.into_owned(),
3517 envelope,
3518 from.id(),
3519 )?,
3520 CreateSinkConnection::Iceberg {
3521 connection,
3522 aws_connection,
3523 options,
3524 ..
3525 } => iceberg_sink_builder(
3526 scx,
3527 connection,
3528 aws_connection,
3529 options,
3530 relation_key_indices,
3531 key_desc_and_indices,
3532 )?,
3533 };
3534
3535 let CreateSinkOptionExtracted {
3536 snapshot,
3537 version,
3538 partition_strategy: _,
3539 seen: _,
3540 commit_interval: _,
3541 } = with_options.try_into()?;
3542
3543 let with_snapshot = snapshot.unwrap_or(true);
3545 let version = version.unwrap_or(0);
3547
3548 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3552 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3553
3554 Ok(Plan::CreateSink(CreateSinkPlan {
3555 name,
3556 sink: Sink {
3557 create_sql,
3558 from: from.global_id(),
3559 connection: connection_builder,
3560 envelope,
3561 version,
3562 },
3563 with_snapshot,
3564 if_not_exists,
3565 in_cluster: in_cluster.id(),
3566 }))
3567}
3568
3569fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3570 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3571
3572 let existing_keys = desc
3573 .typ()
3574 .keys
3575 .iter()
3576 .map(|key_columns| {
3577 key_columns
3578 .iter()
3579 .map(|col| desc.get_name(*col).as_str())
3580 .join(", ")
3581 })
3582 .join(", ");
3583
3584 sql_err!(
3585 "Key constraint ({}) conflicts with existing key ({})",
3586 user_keys,
3587 existing_keys
3588 )
3589}
3590
3591#[derive(Debug, Default, PartialEq, Clone)]
3594pub struct CsrConfigOptionExtracted {
3595 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3596 pub(crate) avro_key_fullname: Option<String>,
3597 pub(crate) avro_value_fullname: Option<String>,
3598 pub(crate) null_defaults: bool,
3599 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3600 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3601 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3602 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3603}
3604
3605impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3606 type Error = crate::plan::PlanError;
3607 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3608 let mut extracted = CsrConfigOptionExtracted::default();
3609 let mut common_doc_comments = BTreeMap::new();
3610 for option in v {
3611 if !extracted.seen.insert(option.name.clone()) {
3612 return Err(PlanError::Unstructured({
3613 format!("{} specified more than once", option.name)
3614 }));
3615 }
3616 let option_name = option.name.clone();
3617 let option_name_str = option_name.to_ast_string_simple();
3618 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3619 option_name: option_name.to_ast_string_simple(),
3620 err: e.into(),
3621 };
3622 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3623 val.map(|s| match s {
3624 WithOptionValue::Value(Value::String(s)) => {
3625 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3626 }
3627 _ => Err("must be a string".to_string()),
3628 })
3629 .transpose()
3630 .map_err(PlanError::Unstructured)
3631 .map_err(better_error)
3632 };
3633 match option.name {
3634 CsrConfigOptionName::AvroKeyFullname => {
3635 extracted.avro_key_fullname =
3636 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3637 }
3638 CsrConfigOptionName::AvroValueFullname => {
3639 extracted.avro_value_fullname =
3640 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3641 }
3642 CsrConfigOptionName::NullDefaults => {
3643 extracted.null_defaults =
3644 <bool>::try_from_value(option.value).map_err(better_error)?;
3645 }
3646 CsrConfigOptionName::AvroDocOn(doc_on) => {
3647 let value = String::try_from_value(option.value.ok_or_else(|| {
3648 PlanError::InvalidOptionValue {
3649 option_name: option_name_str,
3650 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3651 }
3652 })?)
3653 .map_err(better_error)?;
3654 let key = match doc_on.identifier {
3655 DocOnIdentifier::Column(ast::ColumnName {
3656 relation: ResolvedItemName::Item { id, .. },
3657 column: ResolvedColumnReference::Column { name, index: _ },
3658 }) => DocTarget::Field {
3659 object_id: id,
3660 column_name: name,
3661 },
3662 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3663 DocTarget::Type(id)
3664 }
3665 _ => unreachable!(),
3666 };
3667
3668 match doc_on.for_schema {
3669 DocOnSchema::KeyOnly => {
3670 extracted.key_doc_options.insert(key, value);
3671 }
3672 DocOnSchema::ValueOnly => {
3673 extracted.value_doc_options.insert(key, value);
3674 }
3675 DocOnSchema::All => {
3676 common_doc_comments.insert(key, value);
3677 }
3678 }
3679 }
3680 CsrConfigOptionName::KeyCompatibilityLevel => {
3681 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3682 }
3683 CsrConfigOptionName::ValueCompatibilityLevel => {
3684 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3685 }
3686 }
3687 }
3688
3689 for (key, value) in common_doc_comments {
3690 if !extracted.key_doc_options.contains_key(&key) {
3691 extracted.key_doc_options.insert(key.clone(), value.clone());
3692 }
3693 if !extracted.value_doc_options.contains_key(&key) {
3694 extracted.value_doc_options.insert(key, value);
3695 }
3696 }
3697 Ok(extracted)
3698 }
3699}
3700
3701fn iceberg_sink_builder(
3702 scx: &StatementContext,
3703 catalog_connection: ResolvedItemName,
3704 aws_connection: ResolvedItemName,
3705 options: Vec<IcebergSinkConfigOption<Aug>>,
3706 relation_key_indices: Option<Vec<usize>>,
3707 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3708) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3709 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3710 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3711 let catalog_connection_id = catalog_connection_item.id();
3712 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3713 let aws_connection_id = aws_connection_item.id();
3714 if !matches!(
3715 catalog_connection_item.connection()?,
3716 Connection::IcebergCatalog(_)
3717 ) {
3718 sql_bail!(
3719 "{} is not an iceberg catalog connection",
3720 scx.catalog
3721 .resolve_full_name(catalog_connection_item.name())
3722 .to_string()
3723 .quoted()
3724 );
3725 };
3726
3727 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3728 sql_bail!(
3729 "{} is not an AWS connection",
3730 scx.catalog
3731 .resolve_full_name(aws_connection_item.name())
3732 .to_string()
3733 .quoted()
3734 );
3735 }
3736
3737 let IcebergSinkConfigOptionExtracted {
3738 table,
3739 namespace,
3740 seen: _,
3741 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3742
3743 let Some(table) = table else {
3744 sql_bail!("Iceberg sink must specify TABLE");
3745 };
3746 let Some(namespace) = namespace else {
3747 sql_bail!("Iceberg sink must specify NAMESPACE");
3748 };
3749
3750 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3751 catalog_connection_id,
3752 catalog_connection: catalog_connection_id,
3753 aws_connection_id,
3754 aws_connection: aws_connection_id,
3755 table,
3756 namespace,
3757 relation_key_indices,
3758 key_desc_and_indices,
3759 }))
3760}
3761
3762fn kafka_sink_builder(
3763 scx: &StatementContext,
3764 connection: ResolvedItemName,
3765 options: Vec<KafkaSinkConfigOption<Aug>>,
3766 format: Option<FormatSpecifier<Aug>>,
3767 relation_key_indices: Option<Vec<usize>>,
3768 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3769 headers_index: Option<usize>,
3770 value_desc: RelationDesc,
3771 envelope: SinkEnvelope,
3772 sink_from: CatalogItemId,
3773) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3774 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3776 let connection_id = connection_item.id();
3777 match connection_item.connection()? {
3778 Connection::Kafka(_) => (),
3779 _ => sql_bail!(
3780 "{} is not a kafka connection",
3781 scx.catalog.resolve_full_name(connection_item.name())
3782 ),
3783 };
3784
3785 let KafkaSinkConfigOptionExtracted {
3786 topic,
3787 compression_type,
3788 partition_by,
3789 progress_group_id_prefix,
3790 transactional_id_prefix,
3791 legacy_ids,
3792 topic_config,
3793 topic_metadata_refresh_interval,
3794 topic_partition_count,
3795 topic_replication_factor,
3796 seen: _,
3797 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3798
3799 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3800 (Some(_), Some(true)) => {
3801 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3802 }
3803 (None, Some(true)) => KafkaIdStyle::Legacy,
3804 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3805 };
3806
3807 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3808 (Some(_), Some(true)) => {
3809 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3810 }
3811 (None, Some(true)) => KafkaIdStyle::Legacy,
3812 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3813 };
3814
3815 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3816
3817 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3818 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3821 }
3822
3823 let assert_positive = |val: Option<i32>, name: &str| {
3824 if let Some(val) = val {
3825 if val <= 0 {
3826 sql_bail!("{} must be a positive integer", name);
3827 }
3828 }
3829 val.map(NonNeg::try_from)
3830 .transpose()
3831 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3832 };
3833 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3834 let topic_replication_factor =
3835 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3836
3837 let gen_avro_schema_options = |conn| {
3840 let CsrConnectionAvro {
3841 connection:
3842 CsrConnection {
3843 connection,
3844 options,
3845 },
3846 seed,
3847 key_strategy,
3848 value_strategy,
3849 } = conn;
3850 if seed.is_some() {
3851 sql_bail!("SEED option does not make sense with sinks");
3852 }
3853 if key_strategy.is_some() {
3854 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3855 }
3856 if value_strategy.is_some() {
3857 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3858 }
3859
3860 let item = scx.get_item_by_resolved_name(&connection)?;
3861 let csr_connection = match item.connection()? {
3862 Connection::Csr(_) => item.id(),
3863 _ => {
3864 sql_bail!(
3865 "{} is not a schema registry connection",
3866 scx.catalog
3867 .resolve_full_name(item.name())
3868 .to_string()
3869 .quoted()
3870 )
3871 }
3872 };
3873 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3874
3875 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3876 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3877 }
3878
3879 if key_desc_and_indices.is_some()
3880 && (extracted_options.avro_key_fullname.is_some()
3881 ^ extracted_options.avro_value_fullname.is_some())
3882 {
3883 sql_bail!(
3884 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3885 );
3886 }
3887
3888 Ok((csr_connection, extracted_options))
3889 };
3890
3891 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3892 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3893 Format::Bytes if desc.arity() == 1 => {
3894 let col_type = &desc.typ().column_types[0].scalar_type;
3895 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3896 bail_unsupported!(format!(
3897 "BYTES format with non-encodable type: {:?}",
3898 col_type
3899 ));
3900 }
3901
3902 Ok(KafkaSinkFormatType::Bytes)
3903 }
3904 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3905 Format::Bytes | Format::Text => {
3906 bail_unsupported!("BYTES or TEXT format with multiple columns")
3907 }
3908 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3909 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3910 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3911 let schema = if is_key {
3912 AvroSchemaGenerator::new(
3913 desc.clone(),
3914 false,
3915 options.key_doc_options,
3916 options.avro_key_fullname.as_deref().unwrap_or("row"),
3917 options.null_defaults,
3918 Some(sink_from),
3919 false,
3920 )?
3921 .schema()
3922 .to_string()
3923 } else {
3924 AvroSchemaGenerator::new(
3925 desc.clone(),
3926 matches!(envelope, SinkEnvelope::Debezium),
3927 options.value_doc_options,
3928 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3929 options.null_defaults,
3930 Some(sink_from),
3931 true,
3932 )?
3933 .schema()
3934 .to_string()
3935 };
3936 Ok(KafkaSinkFormatType::Avro {
3937 schema,
3938 compatibility_level: if is_key {
3939 options.key_compatibility_level
3940 } else {
3941 options.value_compatibility_level
3942 },
3943 csr_connection,
3944 })
3945 }
3946 format => bail_unsupported!(format!("sink format {:?}", format)),
3947 };
3948
3949 let partition_by = match &partition_by {
3950 Some(partition_by) => {
3951 let mut scope = Scope::from_source(None, value_desc.iter_names());
3952
3953 match envelope {
3954 SinkEnvelope::Upsert => (),
3955 SinkEnvelope::Debezium => {
3956 let key_indices: HashSet<_> = key_desc_and_indices
3957 .as_ref()
3958 .map(|(_desc, indices)| indices.as_slice())
3959 .unwrap_or_default()
3960 .into_iter()
3961 .collect();
3962 for (i, item) in scope.items.iter_mut().enumerate() {
3963 if !key_indices.contains(&i) {
3964 item.error_if_referenced = Some(|_table, column| {
3965 PlanError::InvalidPartitionByEnvelopeDebezium {
3966 column_name: column.to_string(),
3967 }
3968 });
3969 }
3970 }
3971 }
3972 };
3973
3974 let ecx = &ExprContext {
3975 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3976 name: "PARTITION BY",
3977 scope: &scope,
3978 relation_type: value_desc.typ(),
3979 allow_aggregates: false,
3980 allow_subqueries: false,
3981 allow_parameters: false,
3982 allow_windows: false,
3983 };
3984 let expr = plan_expr(ecx, partition_by)?.cast_to(
3985 ecx,
3986 CastContext::Assignment,
3987 &SqlScalarType::UInt64,
3988 )?;
3989 let expr = expr.lower_uncorrelated()?;
3990
3991 Some(expr)
3992 }
3993 _ => None,
3994 };
3995
3996 let format = match format {
3998 Some(FormatSpecifier::KeyValue { key, value }) => {
3999 let key_format = match key_desc_and_indices.as_ref() {
4000 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4001 None => None,
4002 };
4003 KafkaSinkFormat {
4004 value_format: map_format(value, &value_desc, false)?,
4005 key_format,
4006 }
4007 }
4008 Some(FormatSpecifier::Bare(format)) => {
4009 let key_format = match key_desc_and_indices.as_ref() {
4010 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4011 None => None,
4012 };
4013 KafkaSinkFormat {
4014 value_format: map_format(format, &value_desc, false)?,
4015 key_format,
4016 }
4017 }
4018 None => bail_unsupported!("sink without format"),
4019 };
4020
4021 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4022 connection_id,
4023 connection: connection_id,
4024 format,
4025 topic: topic_name,
4026 relation_key_indices,
4027 key_desc_and_indices,
4028 headers_index,
4029 value_desc,
4030 partition_by,
4031 compression_type,
4032 progress_group_id,
4033 transactional_id,
4034 topic_options: KafkaTopicOptions {
4035 partition_count: topic_partition_count,
4036 replication_factor: topic_replication_factor,
4037 topic_config: topic_config.unwrap_or_default(),
4038 },
4039 topic_metadata_refresh_interval,
4040 }))
4041}
4042
4043pub fn describe_create_index(
4044 _: &StatementContext,
4045 _: CreateIndexStatement<Aug>,
4046) -> Result<StatementDesc, PlanError> {
4047 Ok(StatementDesc::new(None))
4048}
4049
4050pub fn plan_create_index(
4051 scx: &StatementContext,
4052 mut stmt: CreateIndexStatement<Aug>,
4053) -> Result<Plan, PlanError> {
4054 let CreateIndexStatement {
4055 name,
4056 on_name,
4057 in_cluster,
4058 key_parts,
4059 with_options,
4060 if_not_exists,
4061 } = &mut stmt;
4062 let on = scx.get_item_by_resolved_name(on_name)?;
4063 let on_item_type = on.item_type();
4064
4065 if !matches!(
4066 on_item_type,
4067 CatalogItemType::View
4068 | CatalogItemType::MaterializedView
4069 | CatalogItemType::Source
4070 | CatalogItemType::Table
4071 ) {
4072 sql_bail!(
4073 "index cannot be created on {} because it is a {}",
4074 on_name.full_name_str(),
4075 on.item_type()
4076 )
4077 }
4078
4079 let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
4080
4081 let filled_key_parts = match key_parts {
4082 Some(kp) => kp.to_vec(),
4083 None => {
4084 on.desc(&scx.catalog.resolve_full_name(on.name()))?
4086 .typ()
4087 .default_key()
4088 .iter()
4089 .map(|i| match on_desc.get_unambiguous_name(*i) {
4090 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4091 _ => Expr::Value(Value::Number((i + 1).to_string())),
4092 })
4093 .collect()
4094 }
4095 };
4096 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4097
4098 let index_name = if let Some(name) = name {
4099 QualifiedItemName {
4100 qualifiers: on.name().qualifiers.clone(),
4101 item: normalize::ident(name.clone()),
4102 }
4103 } else {
4104 let mut idx_name = QualifiedItemName {
4105 qualifiers: on.name().qualifiers.clone(),
4106 item: on.name().item.clone(),
4107 };
4108 if key_parts.is_none() {
4109 idx_name.item += "_primary_idx";
4111 } else {
4112 let index_name_col_suffix = keys
4115 .iter()
4116 .map(|k| match k {
4117 mz_expr::MirScalarExpr::Column(i, name) => {
4118 match (on_desc.get_unambiguous_name(*i), &name.0) {
4119 (Some(col_name), _) => col_name.to_string(),
4120 (None, Some(name)) => name.to_string(),
4121 (None, None) => format!("{}", i + 1),
4122 }
4123 }
4124 _ => "expr".to_string(),
4125 })
4126 .join("_");
4127 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4128 .expect("write on strings cannot fail");
4129 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4130 }
4131
4132 if !*if_not_exists {
4133 scx.catalog.find_available_name(idx_name)
4134 } else {
4135 idx_name
4136 }
4137 };
4138
4139 let full_name = scx.catalog.resolve_full_name(&index_name);
4141 let partial_name = PartialItemName::from(full_name.clone());
4142 if let (Ok(item), false, false) = (
4150 scx.catalog.resolve_item_or_type(&partial_name),
4151 *if_not_exists,
4152 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4153 ) {
4154 return Err(PlanError::ItemAlreadyExists {
4155 name: full_name.to_string(),
4156 item_type: item.item_type(),
4157 });
4158 }
4159
4160 let options = plan_index_options(scx, with_options.clone())?;
4161 let cluster_id = match in_cluster {
4162 None => scx.resolve_cluster(None)?.id(),
4163 Some(in_cluster) => in_cluster.id,
4164 };
4165
4166 *in_cluster = Some(ResolvedClusterName {
4167 id: cluster_id,
4168 print_name: None,
4169 });
4170
4171 *name = Some(Ident::new(index_name.item.clone())?);
4173 *key_parts = Some(filled_key_parts);
4174 let if_not_exists = *if_not_exists;
4175
4176 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4177 let compaction_window = options.iter().find_map(|o| {
4178 #[allow(irrefutable_let_patterns)]
4179 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4180 Some(lcw.clone())
4181 } else {
4182 None
4183 }
4184 });
4185
4186 Ok(Plan::CreateIndex(CreateIndexPlan {
4187 name: index_name,
4188 index: Index {
4189 create_sql,
4190 on: on.global_id(),
4191 keys,
4192 cluster_id,
4193 compaction_window,
4194 },
4195 if_not_exists,
4196 }))
4197}
4198
4199pub fn describe_create_type(
4200 _: &StatementContext,
4201 _: CreateTypeStatement<Aug>,
4202) -> Result<StatementDesc, PlanError> {
4203 Ok(StatementDesc::new(None))
4204}
4205
4206pub fn plan_create_type(
4207 scx: &StatementContext,
4208 stmt: CreateTypeStatement<Aug>,
4209) -> Result<Plan, PlanError> {
4210 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4211 let CreateTypeStatement { name, as_type, .. } = stmt;
4212
4213 fn validate_data_type(
4214 scx: &StatementContext,
4215 data_type: ResolvedDataType,
4216 as_type: &str,
4217 key: &str,
4218 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4219 let (id, modifiers) = match data_type {
4220 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4221 _ => sql_bail!(
4222 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4223 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4224 as_type,
4225 key,
4226 data_type.human_readable_name(),
4227 ),
4228 };
4229
4230 let item = scx.catalog.get_item(&id);
4231 match item.type_details() {
4232 None => sql_bail!(
4233 "{} must be of class type, but received {} which is of class {}",
4234 key,
4235 scx.catalog.resolve_full_name(item.name()),
4236 item.item_type()
4237 ),
4238 Some(CatalogTypeDetails {
4239 typ: CatalogType::Char,
4240 ..
4241 }) => {
4242 bail_unsupported!("embedding char type in a list or map")
4243 }
4244 _ => {
4245 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4247
4248 Ok((id, modifiers))
4249 }
4250 }
4251 }
4252
4253 let inner = match as_type {
4254 CreateTypeAs::List { options } => {
4255 let CreateTypeListOptionExtracted {
4256 element_type,
4257 seen: _,
4258 } = CreateTypeListOptionExtracted::try_from(options)?;
4259 let element_type =
4260 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4261 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4262 CatalogType::List {
4263 element_reference: id,
4264 element_modifiers: modifiers,
4265 }
4266 }
4267 CreateTypeAs::Map { options } => {
4268 let CreateTypeMapOptionExtracted {
4269 key_type,
4270 value_type,
4271 seen: _,
4272 } = CreateTypeMapOptionExtracted::try_from(options)?;
4273 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4274 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4275 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4276 let (value_id, value_modifiers) =
4277 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4278 CatalogType::Map {
4279 key_reference: key_id,
4280 key_modifiers,
4281 value_reference: value_id,
4282 value_modifiers,
4283 }
4284 }
4285 CreateTypeAs::Record { column_defs } => {
4286 let mut fields = vec![];
4287 for column_def in column_defs {
4288 let data_type = column_def.data_type;
4289 let key = ident(column_def.name.clone());
4290 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4291 fields.push(CatalogRecordField {
4292 name: ColumnName::from(key.clone()),
4293 type_reference: id,
4294 type_modifiers: modifiers,
4295 });
4296 }
4297 CatalogType::Record { fields }
4298 }
4299 };
4300
4301 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4302
4303 let full_name = scx.catalog.resolve_full_name(&name);
4305 let partial_name = PartialItemName::from(full_name.clone());
4306 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4309 if item.item_type().conflicts_with_type() {
4310 return Err(PlanError::ItemAlreadyExists {
4311 name: full_name.to_string(),
4312 item_type: item.item_type(),
4313 });
4314 }
4315 }
4316
4317 Ok(Plan::CreateType(CreateTypePlan {
4318 name,
4319 typ: Type { create_sql, inner },
4320 }))
4321}
4322
4323generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4324
4325generate_extracted_config!(
4326 CreateTypeMapOption,
4327 (KeyType, ResolvedDataType),
4328 (ValueType, ResolvedDataType)
4329);
4330
4331#[derive(Debug)]
4332pub enum PlannedAlterRoleOption {
4333 Attributes(PlannedRoleAttributes),
4334 Variable(PlannedRoleVariable),
4335}
4336
4337#[derive(Debug, Clone)]
4338pub struct PlannedRoleAttributes {
4339 pub inherit: Option<bool>,
4340 pub password: Option<Password>,
4341 pub scram_iterations: Option<NonZeroU32>,
4342 pub nopassword: Option<bool>,
4346 pub superuser: Option<bool>,
4347 pub login: Option<bool>,
4348}
4349
4350fn plan_role_attributes(
4351 options: Vec<RoleAttribute>,
4352 scx: &StatementContext,
4353) -> Result<PlannedRoleAttributes, PlanError> {
4354 let mut planned_attributes = PlannedRoleAttributes {
4355 inherit: None,
4356 password: None,
4357 scram_iterations: None,
4358 superuser: None,
4359 login: None,
4360 nopassword: None,
4361 };
4362
4363 for option in options {
4364 match option {
4365 RoleAttribute::Inherit | RoleAttribute::NoInherit
4366 if planned_attributes.inherit.is_some() =>
4367 {
4368 sql_bail!("conflicting or redundant options");
4369 }
4370 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4371 bail_never_supported!(
4372 "CREATECLUSTER attribute",
4373 "sql/create-role/#details",
4374 "Use system privileges instead."
4375 );
4376 }
4377 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4378 bail_never_supported!(
4379 "CREATEDB attribute",
4380 "sql/create-role/#details",
4381 "Use system privileges instead."
4382 );
4383 }
4384 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4385 bail_never_supported!(
4386 "CREATEROLE attribute",
4387 "sql/create-role/#details",
4388 "Use system privileges instead."
4389 );
4390 }
4391 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4392 sql_bail!("conflicting or redundant options");
4393 }
4394
4395 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4396 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4397 RoleAttribute::Password(password) => {
4398 if let Some(password) = password {
4399 planned_attributes.password = Some(password.into());
4400 planned_attributes.scram_iterations =
4401 Some(scx.catalog.system_vars().scram_iterations())
4402 } else {
4403 planned_attributes.nopassword = Some(true);
4404 }
4405 }
4406 RoleAttribute::SuperUser => {
4407 if planned_attributes.superuser == Some(false) {
4408 sql_bail!("conflicting or redundant options");
4409 }
4410 planned_attributes.superuser = Some(true);
4411 }
4412 RoleAttribute::NoSuperUser => {
4413 if planned_attributes.superuser == Some(true) {
4414 sql_bail!("conflicting or redundant options");
4415 }
4416 planned_attributes.superuser = Some(false);
4417 }
4418 RoleAttribute::Login => {
4419 if planned_attributes.login == Some(false) {
4420 sql_bail!("conflicting or redundant options");
4421 }
4422 planned_attributes.login = Some(true);
4423 }
4424 RoleAttribute::NoLogin => {
4425 if planned_attributes.login == Some(true) {
4426 sql_bail!("conflicting or redundant options");
4427 }
4428 planned_attributes.login = Some(false);
4429 }
4430 }
4431 }
4432 if planned_attributes.inherit == Some(false) {
4433 bail_unsupported!("non inherit roles");
4434 }
4435
4436 Ok(planned_attributes)
4437}
4438
4439#[derive(Debug)]
4440pub enum PlannedRoleVariable {
4441 Set { name: String, value: VariableValue },
4442 Reset { name: String },
4443}
4444
4445impl PlannedRoleVariable {
4446 pub fn name(&self) -> &str {
4447 match self {
4448 PlannedRoleVariable::Set { name, .. } => name,
4449 PlannedRoleVariable::Reset { name } => name,
4450 }
4451 }
4452}
4453
4454fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4455 let plan = match variable {
4456 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4457 name: name.to_string(),
4458 value: scl::plan_set_variable_to(value)?,
4459 },
4460 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4461 name: name.to_string(),
4462 },
4463 };
4464 Ok(plan)
4465}
4466
4467pub fn describe_create_role(
4468 _: &StatementContext,
4469 _: CreateRoleStatement,
4470) -> Result<StatementDesc, PlanError> {
4471 Ok(StatementDesc::new(None))
4472}
4473
4474pub fn plan_create_role(
4475 scx: &StatementContext,
4476 CreateRoleStatement { name, options }: CreateRoleStatement,
4477) -> Result<Plan, PlanError> {
4478 let attributes = plan_role_attributes(options, scx)?;
4479 Ok(Plan::CreateRole(CreateRolePlan {
4480 name: normalize::ident(name),
4481 attributes: attributes.into(),
4482 }))
4483}
4484
4485pub fn plan_create_network_policy(
4486 ctx: &StatementContext,
4487 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4488) -> Result<Plan, PlanError> {
4489 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4490 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4491
4492 let Some(rule_defs) = policy_options.rules else {
4493 sql_bail!("RULES must be specified when creating network policies.");
4494 };
4495
4496 let mut rules = vec![];
4497 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4498 let NetworkPolicyRuleOptionExtracted {
4499 seen: _,
4500 direction,
4501 action,
4502 address,
4503 } = options.try_into()?;
4504 let (direction, action, address) = match (direction, action, address) {
4505 (Some(direction), Some(action), Some(address)) => (
4506 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4507 NetworkPolicyRuleAction::try_from(action.as_str())?,
4508 PolicyAddress::try_from(address.as_str())?,
4509 ),
4510 (_, _, _) => {
4511 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4512 }
4513 };
4514 rules.push(NetworkPolicyRule {
4515 name: normalize::ident(name),
4516 direction,
4517 action,
4518 address,
4519 });
4520 }
4521
4522 if rules.len()
4523 > ctx
4524 .catalog
4525 .system_vars()
4526 .max_rules_per_network_policy()
4527 .try_into()?
4528 {
4529 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4530 }
4531
4532 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4533 name: normalize::ident(name),
4534 rules,
4535 }))
4536}
4537
4538pub fn plan_alter_network_policy(
4539 ctx: &StatementContext,
4540 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4541) -> Result<Plan, PlanError> {
4542 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4543
4544 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4545 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4546
4547 let Some(rule_defs) = policy_options.rules else {
4548 sql_bail!("RULES must be specified when creating network policies.");
4549 };
4550
4551 let mut rules = vec![];
4552 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4553 let NetworkPolicyRuleOptionExtracted {
4554 seen: _,
4555 direction,
4556 action,
4557 address,
4558 } = options.try_into()?;
4559
4560 let (direction, action, address) = match (direction, action, address) {
4561 (Some(direction), Some(action), Some(address)) => (
4562 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4563 NetworkPolicyRuleAction::try_from(action.as_str())?,
4564 PolicyAddress::try_from(address.as_str())?,
4565 ),
4566 (_, _, _) => {
4567 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4568 }
4569 };
4570 rules.push(NetworkPolicyRule {
4571 name: normalize::ident(name),
4572 direction,
4573 action,
4574 address,
4575 });
4576 }
4577 if rules.len()
4578 > ctx
4579 .catalog
4580 .system_vars()
4581 .max_rules_per_network_policy()
4582 .try_into()?
4583 {
4584 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4585 }
4586
4587 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4588 id: policy.id(),
4589 name: normalize::ident(name),
4590 rules,
4591 }))
4592}
4593
4594pub fn describe_create_cluster(
4595 _: &StatementContext,
4596 _: CreateClusterStatement<Aug>,
4597) -> Result<StatementDesc, PlanError> {
4598 Ok(StatementDesc::new(None))
4599}
4600
4601generate_extracted_config!(
4607 ClusterOption,
4608 (AvailabilityZones, Vec<String>),
4609 (Disk, bool),
4610 (IntrospectionDebugging, bool),
4611 (IntrospectionInterval, OptionalDuration),
4612 (Managed, bool),
4613 (Replicas, Vec<ReplicaDefinition<Aug>>),
4614 (ReplicationFactor, u32),
4615 (Size, String),
4616 (Schedule, ClusterScheduleOptionValue),
4617 (WorkloadClass, OptionalString)
4618);
4619
4620generate_extracted_config!(
4621 NetworkPolicyOption,
4622 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4623);
4624
4625generate_extracted_config!(
4626 NetworkPolicyRuleOption,
4627 (Direction, String),
4628 (Action, String),
4629 (Address, String)
4630);
4631
4632generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4633
4634generate_extracted_config!(
4635 ClusterAlterUntilReadyOption,
4636 (Timeout, Duration),
4637 (OnTimeout, String)
4638);
4639
4640generate_extracted_config!(
4641 ClusterFeature,
4642 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4643 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4644 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4645 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4646 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4647 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4648 (
4649 EnableProjectionPushdownAfterRelationCse,
4650 Option<bool>,
4651 Default(None)
4652 )
4653);
4654
4655pub fn plan_create_cluster(
4659 scx: &StatementContext,
4660 stmt: CreateClusterStatement<Aug>,
4661) -> Result<Plan, PlanError> {
4662 let plan = plan_create_cluster_inner(scx, stmt)?;
4663
4664 if let CreateClusterVariant::Managed(_) = &plan.variant {
4666 let stmt = unplan_create_cluster(scx, plan.clone())
4667 .map_err(|e| PlanError::Replan(e.to_string()))?;
4668 let create_sql = stmt.to_ast_string_stable();
4669 let stmt = parse::parse(&create_sql)
4670 .map_err(|e| PlanError::Replan(e.to_string()))?
4671 .into_element()
4672 .ast;
4673 let (stmt, _resolved_ids) =
4674 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4675 let stmt = match stmt {
4676 Statement::CreateCluster(stmt) => stmt,
4677 stmt => {
4678 return Err(PlanError::Replan(format!(
4679 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4680 )));
4681 }
4682 };
4683 let replan =
4684 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4685 if plan != replan {
4686 return Err(PlanError::Replan(format!(
4687 "replan does not match: plan={plan:?}, replan={replan:?}"
4688 )));
4689 }
4690 }
4691
4692 Ok(Plan::CreateCluster(plan))
4693}
4694
4695pub fn plan_create_cluster_inner(
4696 scx: &StatementContext,
4697 CreateClusterStatement {
4698 name,
4699 options,
4700 features,
4701 }: CreateClusterStatement<Aug>,
4702) -> Result<CreateClusterPlan, PlanError> {
4703 let ClusterOptionExtracted {
4704 availability_zones,
4705 introspection_debugging,
4706 introspection_interval,
4707 managed,
4708 replicas,
4709 replication_factor,
4710 seen: _,
4711 size,
4712 disk,
4713 schedule,
4714 workload_class,
4715 }: ClusterOptionExtracted = options.try_into()?;
4716
4717 let managed = managed.unwrap_or_else(|| replicas.is_none());
4718
4719 if !scx.catalog.active_role_id().is_system() {
4720 if !features.is_empty() {
4721 sql_bail!("FEATURES not supported for non-system users");
4722 }
4723 if workload_class.is_some() {
4724 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4725 }
4726 }
4727
4728 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4729 let workload_class = workload_class.and_then(|v| v.0);
4730
4731 if managed {
4732 if replicas.is_some() {
4733 sql_bail!("REPLICAS not supported for managed clusters");
4734 }
4735 let Some(size) = size else {
4736 sql_bail!("SIZE must be specified for managed clusters");
4737 };
4738
4739 if disk.is_some() {
4740 if scx.catalog.is_cluster_size_cc(&size) {
4744 sql_bail!(
4745 "DISK option not supported for modern cluster sizes because disk is always enabled"
4746 );
4747 }
4748
4749 scx.catalog
4750 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4751 }
4752
4753 let compute = plan_compute_replica_config(
4754 introspection_interval,
4755 introspection_debugging.unwrap_or(false),
4756 )?;
4757
4758 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4759 replication_factor.unwrap_or_else(|| {
4760 scx.catalog
4761 .system_vars()
4762 .default_cluster_replication_factor()
4763 })
4764 } else {
4765 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4766 if replication_factor.is_some() {
4767 sql_bail!(
4768 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4769 );
4770 }
4771 0
4775 };
4776 let availability_zones = availability_zones.unwrap_or_default();
4777
4778 if !availability_zones.is_empty() {
4779 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4780 }
4781
4782 let ClusterFeatureExtracted {
4784 reoptimize_imported_views,
4785 enable_eager_delta_joins,
4786 enable_new_outer_join_lowering,
4787 enable_variadic_left_join_lowering,
4788 enable_letrec_fixpoint_analysis,
4789 enable_join_prioritize_arranged,
4790 enable_projection_pushdown_after_relation_cse,
4791 seen: _,
4792 } = ClusterFeatureExtracted::try_from(features)?;
4793 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4794 reoptimize_imported_views,
4795 enable_eager_delta_joins,
4796 enable_new_outer_join_lowering,
4797 enable_variadic_left_join_lowering,
4798 enable_letrec_fixpoint_analysis,
4799 enable_join_prioritize_arranged,
4800 enable_projection_pushdown_after_relation_cse,
4801 ..Default::default()
4802 };
4803
4804 let schedule = plan_cluster_schedule(schedule)?;
4805
4806 Ok(CreateClusterPlan {
4807 name: normalize::ident(name),
4808 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4809 replication_factor,
4810 size,
4811 availability_zones,
4812 compute,
4813 optimizer_feature_overrides,
4814 schedule,
4815 }),
4816 workload_class,
4817 })
4818 } else {
4819 let Some(replica_defs) = replicas else {
4820 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4821 };
4822 if availability_zones.is_some() {
4823 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4824 }
4825 if replication_factor.is_some() {
4826 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4827 }
4828 if introspection_debugging.is_some() {
4829 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4830 }
4831 if introspection_interval.is_some() {
4832 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4833 }
4834 if size.is_some() {
4835 sql_bail!("SIZE not supported for unmanaged clusters");
4836 }
4837 if disk.is_some() {
4838 sql_bail!("DISK not supported for unmanaged clusters");
4839 }
4840 if !features.is_empty() {
4841 sql_bail!("FEATURES not supported for unmanaged clusters");
4842 }
4843 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4844 sql_bail!(
4845 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4846 );
4847 }
4848
4849 let mut replicas = vec![];
4850 for ReplicaDefinition { name, options } in replica_defs {
4851 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4852 }
4853
4854 Ok(CreateClusterPlan {
4855 name: normalize::ident(name),
4856 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4857 workload_class,
4858 })
4859 }
4860}
4861
4862pub fn unplan_create_cluster(
4866 scx: &StatementContext,
4867 CreateClusterPlan {
4868 name,
4869 variant,
4870 workload_class,
4871 }: CreateClusterPlan,
4872) -> Result<CreateClusterStatement<Aug>, PlanError> {
4873 match variant {
4874 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4875 replication_factor,
4876 size,
4877 availability_zones,
4878 compute,
4879 optimizer_feature_overrides,
4880 schedule,
4881 }) => {
4882 let schedule = unplan_cluster_schedule(schedule);
4883 let OptimizerFeatureOverrides {
4884 enable_guard_subquery_tablefunc: _,
4885 enable_consolidate_after_union_negate: _,
4886 enable_reduce_mfp_fusion: _,
4887 enable_cardinality_estimates: _,
4888 persist_fast_path_limit: _,
4889 reoptimize_imported_views,
4890 enable_eager_delta_joins,
4891 enable_new_outer_join_lowering,
4892 enable_variadic_left_join_lowering,
4893 enable_letrec_fixpoint_analysis,
4894 enable_reduce_reduction: _,
4895 enable_join_prioritize_arranged,
4896 enable_projection_pushdown_after_relation_cse,
4897 enable_less_reduce_in_eqprop: _,
4898 enable_dequadratic_eqprop_map: _,
4899 enable_eq_classes_withholding_errors: _,
4900 enable_fast_path_plan_insights: _,
4901 enable_repr_typecheck: _,
4902 } = optimizer_feature_overrides;
4903 let features_extracted = ClusterFeatureExtracted {
4905 seen: Default::default(),
4907 reoptimize_imported_views,
4908 enable_eager_delta_joins,
4909 enable_new_outer_join_lowering,
4910 enable_variadic_left_join_lowering,
4911 enable_letrec_fixpoint_analysis,
4912 enable_join_prioritize_arranged,
4913 enable_projection_pushdown_after_relation_cse,
4914 };
4915 let features = features_extracted.into_values(scx.catalog);
4916 let availability_zones = if availability_zones.is_empty() {
4917 None
4918 } else {
4919 Some(availability_zones)
4920 };
4921 let (introspection_interval, introspection_debugging) =
4922 unplan_compute_replica_config(compute);
4923 let replication_factor = match &schedule {
4926 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4927 ClusterScheduleOptionValue::Refresh { .. } => {
4928 assert!(
4929 replication_factor <= 1,
4930 "replication factor, {replication_factor:?}, must be <= 1"
4931 );
4932 None
4933 }
4934 };
4935 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4936 let options_extracted = ClusterOptionExtracted {
4937 seen: Default::default(),
4939 availability_zones,
4940 disk: None,
4941 introspection_debugging: Some(introspection_debugging),
4942 introspection_interval,
4943 managed: Some(true),
4944 replicas: None,
4945 replication_factor,
4946 size: Some(size),
4947 schedule: Some(schedule),
4948 workload_class,
4949 };
4950 let options = options_extracted.into_values(scx.catalog);
4951 let name = Ident::new_unchecked(name);
4952 Ok(CreateClusterStatement {
4953 name,
4954 options,
4955 features,
4956 })
4957 }
4958 CreateClusterVariant::Unmanaged(_) => {
4959 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4960 }
4961 }
4962}
4963
4964generate_extracted_config!(
4965 ReplicaOption,
4966 (AvailabilityZone, String),
4967 (BilledAs, String),
4968 (ComputeAddresses, Vec<String>),
4969 (ComputectlAddresses, Vec<String>),
4970 (Disk, bool),
4971 (Internal, bool, Default(false)),
4972 (IntrospectionDebugging, bool, Default(false)),
4973 (IntrospectionInterval, OptionalDuration),
4974 (Size, String),
4975 (StorageAddresses, Vec<String>),
4976 (StoragectlAddresses, Vec<String>),
4977 (Workers, u16)
4978);
4979
4980fn plan_replica_config(
4981 scx: &StatementContext,
4982 options: Vec<ReplicaOption<Aug>>,
4983) -> Result<ReplicaConfig, PlanError> {
4984 let ReplicaOptionExtracted {
4985 availability_zone,
4986 billed_as,
4987 computectl_addresses,
4988 disk,
4989 internal,
4990 introspection_debugging,
4991 introspection_interval,
4992 size,
4993 storagectl_addresses,
4994 ..
4995 }: ReplicaOptionExtracted = options.try_into()?;
4996
4997 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4998
4999 match (
5000 size,
5001 availability_zone,
5002 billed_as,
5003 storagectl_addresses,
5004 computectl_addresses,
5005 ) {
5006 (None, _, None, None, None) => {
5008 sql_bail!("SIZE option must be specified");
5011 }
5012 (Some(size), availability_zone, billed_as, None, None) => {
5013 if disk.is_some() {
5014 if scx.catalog.is_cluster_size_cc(&size) {
5018 sql_bail!(
5019 "DISK option not supported for modern cluster sizes because disk is always enabled"
5020 );
5021 }
5022
5023 scx.catalog
5024 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5025 }
5026
5027 Ok(ReplicaConfig::Orchestrated {
5028 size,
5029 availability_zone,
5030 compute,
5031 billed_as,
5032 internal,
5033 })
5034 }
5035
5036 (None, None, None, storagectl_addresses, computectl_addresses) => {
5037 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5038
5039 let Some(storagectl_addrs) = storagectl_addresses else {
5043 sql_bail!("missing STORAGECTL ADDRESSES option");
5044 };
5045 let Some(computectl_addrs) = computectl_addresses else {
5046 sql_bail!("missing COMPUTECTL ADDRESSES option");
5047 };
5048
5049 if storagectl_addrs.len() != computectl_addrs.len() {
5050 sql_bail!(
5051 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5052 );
5053 }
5054
5055 if disk.is_some() {
5056 sql_bail!("DISK can't be specified for unorchestrated clusters");
5057 }
5058
5059 Ok(ReplicaConfig::Unorchestrated {
5060 storagectl_addrs,
5061 computectl_addrs,
5062 compute,
5063 })
5064 }
5065 _ => {
5066 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5069 }
5070 }
5071}
5072
5073fn plan_compute_replica_config(
5077 introspection_interval: Option<OptionalDuration>,
5078 introspection_debugging: bool,
5079) -> Result<ComputeReplicaConfig, PlanError> {
5080 let introspection_interval = introspection_interval
5081 .map(|OptionalDuration(i)| i)
5082 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5083 let introspection = match introspection_interval {
5084 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5085 interval,
5086 debugging: introspection_debugging,
5087 }),
5088 None if introspection_debugging => {
5089 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5090 }
5091 None => None,
5092 };
5093 let compute = ComputeReplicaConfig { introspection };
5094 Ok(compute)
5095}
5096
5097fn unplan_compute_replica_config(
5101 compute_replica_config: ComputeReplicaConfig,
5102) -> (Option<OptionalDuration>, bool) {
5103 match compute_replica_config.introspection {
5104 Some(ComputeReplicaIntrospectionConfig {
5105 debugging,
5106 interval,
5107 }) => (Some(OptionalDuration(Some(interval))), debugging),
5108 None => (Some(OptionalDuration(None)), false),
5109 }
5110}
5111
5112fn plan_cluster_schedule(
5116 schedule: ClusterScheduleOptionValue,
5117) -> Result<ClusterSchedule, PlanError> {
5118 Ok(match schedule {
5119 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5120 ClusterScheduleOptionValue::Refresh {
5122 hydration_time_estimate: None,
5123 } => ClusterSchedule::Refresh {
5124 hydration_time_estimate: Duration::from_millis(0),
5125 },
5126 ClusterScheduleOptionValue::Refresh {
5128 hydration_time_estimate: Some(interval_value),
5129 } => {
5130 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5131 if interval.as_microseconds() < 0 {
5132 sql_bail!(
5133 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5134 interval
5135 );
5136 }
5137 if interval.months != 0 {
5138 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5142 }
5143 let duration = interval.duration()?;
5144 if u64::try_from(duration.as_millis()).is_err()
5145 || Interval::from_duration(&duration).is_err()
5146 {
5147 sql_bail!("HYDRATION TIME ESTIMATE too large");
5148 }
5149 ClusterSchedule::Refresh {
5150 hydration_time_estimate: duration,
5151 }
5152 }
5153 })
5154}
5155
5156fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5160 match schedule {
5161 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5162 ClusterSchedule::Refresh {
5163 hydration_time_estimate,
5164 } => {
5165 let interval = Interval::from_duration(&hydration_time_estimate)
5166 .expect("planning ensured that this is convertible back to Interval");
5167 let interval_value = literal::unplan_interval(&interval);
5168 ClusterScheduleOptionValue::Refresh {
5169 hydration_time_estimate: Some(interval_value),
5170 }
5171 }
5172 }
5173}
5174
5175pub fn describe_create_cluster_replica(
5176 _: &StatementContext,
5177 _: CreateClusterReplicaStatement<Aug>,
5178) -> Result<StatementDesc, PlanError> {
5179 Ok(StatementDesc::new(None))
5180}
5181
5182pub fn plan_create_cluster_replica(
5183 scx: &StatementContext,
5184 CreateClusterReplicaStatement {
5185 definition: ReplicaDefinition { name, options },
5186 of_cluster,
5187 }: CreateClusterReplicaStatement<Aug>,
5188) -> Result<Plan, PlanError> {
5189 let cluster = scx
5190 .catalog
5191 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5192 let current_replica_count = cluster.replica_ids().iter().count();
5193 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5194 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5195 return Err(PlanError::CreateReplicaFailStorageObjects {
5196 current_replica_count,
5197 internal_replica_count,
5198 hypothetical_replica_count: current_replica_count + 1,
5199 });
5200 }
5201
5202 let config = plan_replica_config(scx, options)?;
5203
5204 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5205 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5206 return Err(PlanError::MangedReplicaName(name.into_string()));
5207 }
5208 } else {
5209 ensure_cluster_is_not_managed(scx, cluster.id())?;
5210 }
5211
5212 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5213 name: normalize::ident(name),
5214 cluster_id: cluster.id(),
5215 config,
5216 }))
5217}
5218
5219pub fn describe_create_secret(
5220 _: &StatementContext,
5221 _: CreateSecretStatement<Aug>,
5222) -> Result<StatementDesc, PlanError> {
5223 Ok(StatementDesc::new(None))
5224}
5225
5226pub fn plan_create_secret(
5227 scx: &StatementContext,
5228 stmt: CreateSecretStatement<Aug>,
5229) -> Result<Plan, PlanError> {
5230 let CreateSecretStatement {
5231 name,
5232 if_not_exists,
5233 value,
5234 } = &stmt;
5235
5236 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5237 let mut create_sql_statement = stmt.clone();
5238 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5239 let create_sql =
5240 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5241 let secret_as = query::plan_secret_as(scx, value.clone())?;
5242
5243 let secret = Secret {
5244 create_sql,
5245 secret_as,
5246 };
5247
5248 Ok(Plan::CreateSecret(CreateSecretPlan {
5249 name,
5250 secret,
5251 if_not_exists: *if_not_exists,
5252 }))
5253}
5254
5255pub fn describe_create_connection(
5256 _: &StatementContext,
5257 _: CreateConnectionStatement<Aug>,
5258) -> Result<StatementDesc, PlanError> {
5259 Ok(StatementDesc::new(None))
5260}
5261
5262generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5263
5264pub fn plan_create_connection(
5265 scx: &StatementContext,
5266 mut stmt: CreateConnectionStatement<Aug>,
5267) -> Result<Plan, PlanError> {
5268 let CreateConnectionStatement {
5269 name,
5270 connection_type,
5271 values,
5272 if_not_exists,
5273 with_options,
5274 } = stmt.clone();
5275 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5276 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5277 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5278
5279 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5280 if options.validate.is_some() {
5281 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5282 }
5283 let validate = match options.validate {
5284 Some(val) => val,
5285 None => {
5286 scx.catalog
5287 .system_vars()
5288 .enable_default_connection_validation()
5289 && details.to_connection().validate_by_default()
5290 }
5291 };
5292
5293 let full_name = scx.catalog.resolve_full_name(&name);
5295 let partial_name = PartialItemName::from(full_name.clone());
5296 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5297 return Err(PlanError::ItemAlreadyExists {
5298 name: full_name.to_string(),
5299 item_type: item.item_type(),
5300 });
5301 }
5302
5303 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5306 stmt.values.retain(|v| {
5307 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5308 });
5309 stmt.values.push(ConnectionOption {
5310 name: ConnectionOptionName::PublicKey1,
5311 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5312 });
5313 stmt.values.push(ConnectionOption {
5314 name: ConnectionOptionName::PublicKey2,
5315 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5316 });
5317 }
5318 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5319
5320 let plan = CreateConnectionPlan {
5321 name,
5322 if_not_exists,
5323 connection: crate::plan::Connection {
5324 create_sql,
5325 details,
5326 },
5327 validate,
5328 };
5329 Ok(Plan::CreateConnection(plan))
5330}
5331
5332fn plan_drop_database(
5333 scx: &StatementContext,
5334 if_exists: bool,
5335 name: &UnresolvedDatabaseName,
5336 cascade: bool,
5337) -> Result<Option<DatabaseId>, PlanError> {
5338 Ok(match resolve_database(scx, name, if_exists)? {
5339 Some(database) => {
5340 if !cascade && database.has_schemas() {
5341 sql_bail!(
5342 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5343 name,
5344 );
5345 }
5346 Some(database.id())
5347 }
5348 None => None,
5349 })
5350}
5351
5352pub fn describe_drop_objects(
5353 _: &StatementContext,
5354 _: DropObjectsStatement,
5355) -> Result<StatementDesc, PlanError> {
5356 Ok(StatementDesc::new(None))
5357}
5358
5359pub fn plan_drop_objects(
5360 scx: &mut StatementContext,
5361 DropObjectsStatement {
5362 object_type,
5363 if_exists,
5364 names,
5365 cascade,
5366 }: DropObjectsStatement,
5367) -> Result<Plan, PlanError> {
5368 assert_ne!(
5369 object_type,
5370 mz_sql_parser::ast::ObjectType::Func,
5371 "rejected in parser"
5372 );
5373 let object_type = object_type.into();
5374
5375 let mut referenced_ids = Vec::new();
5376 for name in names {
5377 let id = match &name {
5378 UnresolvedObjectName::Cluster(name) => {
5379 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5380 }
5381 UnresolvedObjectName::ClusterReplica(name) => {
5382 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5383 }
5384 UnresolvedObjectName::Database(name) => {
5385 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5386 }
5387 UnresolvedObjectName::Schema(name) => {
5388 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5389 }
5390 UnresolvedObjectName::Role(name) => {
5391 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5392 }
5393 UnresolvedObjectName::Item(name) => {
5394 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5395 .map(ObjectId::Item)
5396 }
5397 UnresolvedObjectName::NetworkPolicy(name) => {
5398 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5399 }
5400 };
5401 match id {
5402 Some(id) => referenced_ids.push(id),
5403 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5404 name: name.to_ast_string_simple(),
5405 object_type,
5406 }),
5407 }
5408 }
5409 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5410
5411 Ok(Plan::DropObjects(DropObjectsPlan {
5412 referenced_ids,
5413 drop_ids,
5414 object_type,
5415 }))
5416}
5417
5418fn plan_drop_schema(
5419 scx: &StatementContext,
5420 if_exists: bool,
5421 name: &UnresolvedSchemaName,
5422 cascade: bool,
5423) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5424 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5425 Some((database_spec, schema_spec)) => {
5426 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5427 sql_bail!(
5428 "cannot drop schema {name} because it is required by the database system",
5429 );
5430 }
5431 if let SchemaSpecifier::Temporary = schema_spec {
5432 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5433 }
5434 let schema = scx.get_schema(&database_spec, &schema_spec);
5435 if !cascade && schema.has_items() {
5436 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5437 sql_bail!(
5438 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5439 full_schema_name
5440 );
5441 }
5442 Some((database_spec, schema_spec))
5443 }
5444 None => None,
5445 })
5446}
5447
5448fn plan_drop_role(
5449 scx: &StatementContext,
5450 if_exists: bool,
5451 name: &Ident,
5452) -> Result<Option<RoleId>, PlanError> {
5453 match scx.catalog.resolve_role(name.as_str()) {
5454 Ok(role) => {
5455 let id = role.id();
5456 if &id == scx.catalog.active_role_id() {
5457 sql_bail!("current role cannot be dropped");
5458 }
5459 for role in scx.catalog.get_roles() {
5460 for (member_id, grantor_id) in role.membership() {
5461 if &id == grantor_id {
5462 let member_role = scx.catalog.get_role(member_id);
5463 sql_bail!(
5464 "cannot drop role {}: still depended up by membership of role {} in role {}",
5465 name.as_str(),
5466 role.name(),
5467 member_role.name()
5468 );
5469 }
5470 }
5471 }
5472 Ok(Some(role.id()))
5473 }
5474 Err(_) if if_exists => Ok(None),
5475 Err(e) => Err(e.into()),
5476 }
5477}
5478
5479fn plan_drop_cluster(
5480 scx: &StatementContext,
5481 if_exists: bool,
5482 name: &Ident,
5483 cascade: bool,
5484) -> Result<Option<ClusterId>, PlanError> {
5485 Ok(match resolve_cluster(scx, name, if_exists)? {
5486 Some(cluster) => {
5487 if !cascade && !cluster.bound_objects().is_empty() {
5488 return Err(PlanError::DependentObjectsStillExist {
5489 object_type: "cluster".to_string(),
5490 object_name: cluster.name().to_string(),
5491 dependents: Vec::new(),
5492 });
5493 }
5494 Some(cluster.id())
5495 }
5496 None => None,
5497 })
5498}
5499
5500fn plan_drop_network_policy(
5501 scx: &StatementContext,
5502 if_exists: bool,
5503 name: &Ident,
5504) -> Result<Option<NetworkPolicyId>, PlanError> {
5505 match scx.catalog.resolve_network_policy(name.as_str()) {
5506 Ok(policy) => {
5507 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5510 Err(PlanError::NetworkPolicyInUse)
5511 } else {
5512 Ok(Some(policy.id()))
5513 }
5514 }
5515 Err(_) if if_exists => Ok(None),
5516 Err(e) => Err(e.into()),
5517 }
5518}
5519
5520fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5523 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5525 false
5526 } else {
5527 cluster.bound_objects().iter().any(|id| {
5529 let item = scx.catalog.get_item(id);
5530 matches!(
5531 item.item_type(),
5532 CatalogItemType::Sink | CatalogItemType::Source
5533 )
5534 })
5535 }
5536}
5537
5538fn plan_drop_cluster_replica(
5539 scx: &StatementContext,
5540 if_exists: bool,
5541 name: &QualifiedReplica,
5542) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5543 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5544 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5545}
5546
5547fn plan_drop_item(
5549 scx: &StatementContext,
5550 object_type: ObjectType,
5551 if_exists: bool,
5552 name: UnresolvedItemName,
5553 cascade: bool,
5554) -> Result<Option<CatalogItemId>, PlanError> {
5555 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5556 Ok(r) => r,
5557 Err(PlanError::MismatchedObjectType {
5559 name,
5560 is_type: ObjectType::MaterializedView,
5561 expected_type: ObjectType::View,
5562 }) => {
5563 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5564 }
5565 e => e?,
5566 };
5567
5568 Ok(match resolved {
5569 Some(catalog_item) => {
5570 if catalog_item.id().is_system() {
5571 sql_bail!(
5572 "cannot drop {} {} because it is required by the database system",
5573 catalog_item.item_type(),
5574 scx.catalog.minimal_qualification(catalog_item.name()),
5575 );
5576 }
5577
5578 if !cascade {
5579 for id in catalog_item.used_by() {
5580 let dep = scx.catalog.get_item(id);
5581 if dependency_prevents_drop(object_type, dep) {
5582 return Err(PlanError::DependentObjectsStillExist {
5583 object_type: catalog_item.item_type().to_string(),
5584 object_name: scx
5585 .catalog
5586 .minimal_qualification(catalog_item.name())
5587 .to_string(),
5588 dependents: vec![(
5589 dep.item_type().to_string(),
5590 scx.catalog.minimal_qualification(dep.name()).to_string(),
5591 )],
5592 });
5593 }
5594 }
5595 }
5598 Some(catalog_item.id())
5599 }
5600 None => None,
5601 })
5602}
5603
5604fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5606 match object_type {
5607 ObjectType::Type => true,
5608 _ => match dep.item_type() {
5609 CatalogItemType::Func
5610 | CatalogItemType::Table
5611 | CatalogItemType::Source
5612 | CatalogItemType::View
5613 | CatalogItemType::MaterializedView
5614 | CatalogItemType::Sink
5615 | CatalogItemType::Type
5616 | CatalogItemType::Secret
5617 | CatalogItemType::Connection
5618 | CatalogItemType::ContinualTask => true,
5619 CatalogItemType::Index => false,
5620 },
5621 }
5622}
5623
5624pub fn describe_alter_index_options(
5625 _: &StatementContext,
5626 _: AlterIndexStatement<Aug>,
5627) -> Result<StatementDesc, PlanError> {
5628 Ok(StatementDesc::new(None))
5629}
5630
5631pub fn describe_drop_owned(
5632 _: &StatementContext,
5633 _: DropOwnedStatement<Aug>,
5634) -> Result<StatementDesc, PlanError> {
5635 Ok(StatementDesc::new(None))
5636}
5637
5638pub fn plan_drop_owned(
5639 scx: &StatementContext,
5640 drop: DropOwnedStatement<Aug>,
5641) -> Result<Plan, PlanError> {
5642 let cascade = drop.cascade();
5643 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5644 let mut drop_ids = Vec::new();
5645 let mut privilege_revokes = Vec::new();
5646 let mut default_privilege_revokes = Vec::new();
5647
5648 fn update_privilege_revokes(
5649 object_id: SystemObjectId,
5650 privileges: &PrivilegeMap,
5651 role_ids: &BTreeSet<RoleId>,
5652 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5653 ) {
5654 privilege_revokes.extend(iter::zip(
5655 iter::repeat(object_id),
5656 privileges
5657 .all_values()
5658 .filter(|privilege| role_ids.contains(&privilege.grantee))
5659 .cloned(),
5660 ));
5661 }
5662
5663 for replica in scx.catalog.get_cluster_replicas() {
5665 if role_ids.contains(&replica.owner_id()) {
5666 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5667 }
5668 }
5669
5670 for cluster in scx.catalog.get_clusters() {
5672 if role_ids.contains(&cluster.owner_id()) {
5673 if !cascade {
5675 let non_owned_bound_objects: Vec<_> = cluster
5676 .bound_objects()
5677 .into_iter()
5678 .map(|item_id| scx.catalog.get_item(item_id))
5679 .filter(|item| !role_ids.contains(&item.owner_id()))
5680 .collect();
5681 if !non_owned_bound_objects.is_empty() {
5682 let names: Vec<_> = non_owned_bound_objects
5683 .into_iter()
5684 .map(|item| {
5685 (
5686 item.item_type().to_string(),
5687 scx.catalog.resolve_full_name(item.name()).to_string(),
5688 )
5689 })
5690 .collect();
5691 return Err(PlanError::DependentObjectsStillExist {
5692 object_type: "cluster".to_string(),
5693 object_name: cluster.name().to_string(),
5694 dependents: names,
5695 });
5696 }
5697 }
5698 drop_ids.push(cluster.id().into());
5699 }
5700 update_privilege_revokes(
5701 SystemObjectId::Object(cluster.id().into()),
5702 cluster.privileges(),
5703 &role_ids,
5704 &mut privilege_revokes,
5705 );
5706 }
5707
5708 for item in scx.catalog.get_items() {
5710 if role_ids.contains(&item.owner_id()) {
5711 if !cascade {
5712 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5714 let non_owned_dependencies: Vec<_> = used_by
5715 .into_iter()
5716 .map(|item_id| scx.catalog.get_item(item_id))
5717 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5718 .filter(|item| !role_ids.contains(&item.owner_id()))
5719 .collect();
5720 if !non_owned_dependencies.is_empty() {
5721 let names: Vec<_> = non_owned_dependencies
5722 .into_iter()
5723 .map(|item| {
5724 let item_typ = item.item_type().to_string();
5725 let item_name =
5726 scx.catalog.resolve_full_name(item.name()).to_string();
5727 (item_typ, item_name)
5728 })
5729 .collect();
5730 Err(PlanError::DependentObjectsStillExist {
5731 object_type: item.item_type().to_string(),
5732 object_name: scx
5733 .catalog
5734 .resolve_full_name(item.name())
5735 .to_string()
5736 .to_string(),
5737 dependents: names,
5738 })
5739 } else {
5740 Ok(())
5741 }
5742 };
5743
5744 if let Some(id) = item.progress_id() {
5747 let progress_item = scx.catalog.get_item(&id);
5748 check_if_dependents_exist(progress_item.used_by())?;
5749 }
5750 check_if_dependents_exist(item.used_by())?;
5751 }
5752 drop_ids.push(item.id().into());
5753 }
5754 update_privilege_revokes(
5755 SystemObjectId::Object(item.id().into()),
5756 item.privileges(),
5757 &role_ids,
5758 &mut privilege_revokes,
5759 );
5760 }
5761
5762 for schema in scx.catalog.get_schemas() {
5764 if !schema.id().is_temporary() {
5765 if role_ids.contains(&schema.owner_id()) {
5766 if !cascade {
5767 let non_owned_dependencies: Vec<_> = schema
5768 .item_ids()
5769 .map(|item_id| scx.catalog.get_item(&item_id))
5770 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5771 .filter(|item| !role_ids.contains(&item.owner_id()))
5772 .collect();
5773 if !non_owned_dependencies.is_empty() {
5774 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5775 sql_bail!(
5776 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5777 full_schema_name.to_string().quoted()
5778 );
5779 }
5780 }
5781 drop_ids.push((*schema.database(), *schema.id()).into())
5782 }
5783 update_privilege_revokes(
5784 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5785 schema.privileges(),
5786 &role_ids,
5787 &mut privilege_revokes,
5788 );
5789 }
5790 }
5791
5792 for database in scx.catalog.get_databases() {
5794 if role_ids.contains(&database.owner_id()) {
5795 if !cascade {
5796 let non_owned_schemas: Vec<_> = database
5797 .schemas()
5798 .into_iter()
5799 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5800 .collect();
5801 if !non_owned_schemas.is_empty() {
5802 sql_bail!(
5803 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5804 database.name().quoted(),
5805 );
5806 }
5807 }
5808 drop_ids.push(database.id().into());
5809 }
5810 update_privilege_revokes(
5811 SystemObjectId::Object(database.id().into()),
5812 database.privileges(),
5813 &role_ids,
5814 &mut privilege_revokes,
5815 );
5816 }
5817
5818 update_privilege_revokes(
5820 SystemObjectId::System,
5821 scx.catalog.get_system_privileges(),
5822 &role_ids,
5823 &mut privilege_revokes,
5824 );
5825
5826 for (default_privilege_object, default_privilege_acl_items) in
5827 scx.catalog.get_default_privileges()
5828 {
5829 for default_privilege_acl_item in default_privilege_acl_items {
5830 if role_ids.contains(&default_privilege_object.role_id)
5831 || role_ids.contains(&default_privilege_acl_item.grantee)
5832 {
5833 default_privilege_revokes.push((
5834 default_privilege_object.clone(),
5835 default_privilege_acl_item.clone(),
5836 ));
5837 }
5838 }
5839 }
5840
5841 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5842
5843 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5844 if !system_ids.is_empty() {
5845 let mut owners = system_ids
5846 .into_iter()
5847 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5848 .collect::<BTreeSet<_>>()
5849 .into_iter()
5850 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5851 sql_bail!(
5852 "cannot drop objects owned by role {} because they are required by the database system",
5853 owners.join(", "),
5854 );
5855 }
5856
5857 Ok(Plan::DropOwned(DropOwnedPlan {
5858 role_ids: role_ids.into_iter().collect(),
5859 drop_ids,
5860 privilege_revokes,
5861 default_privilege_revokes,
5862 }))
5863}
5864
5865fn plan_retain_history_option(
5866 scx: &StatementContext,
5867 retain_history: Option<OptionalDuration>,
5868) -> Result<Option<CompactionWindow>, PlanError> {
5869 if let Some(OptionalDuration(lcw)) = retain_history {
5870 Ok(Some(plan_retain_history(scx, lcw)?))
5871 } else {
5872 Ok(None)
5873 }
5874}
5875
5876fn plan_retain_history(
5882 scx: &StatementContext,
5883 lcw: Option<Duration>,
5884) -> Result<CompactionWindow, PlanError> {
5885 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5886 match lcw {
5887 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5892 option_name: "RETAIN HISTORY".to_string(),
5893 err: Box::new(PlanError::Unstructured(
5894 "internal error: unexpectedly zero".to_string(),
5895 )),
5896 }),
5897 Some(duration) => {
5898 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5901 && scx
5902 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5903 .is_err()
5904 {
5905 return Err(PlanError::RetainHistoryLow {
5906 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5907 });
5908 }
5909 Ok(duration.try_into()?)
5910 }
5911 None => {
5914 if scx
5915 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5916 .is_err()
5917 {
5918 Err(PlanError::RetainHistoryRequired)
5919 } else {
5920 Ok(CompactionWindow::DisableCompaction)
5921 }
5922 }
5923 }
5924}
5925
5926generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5927
5928fn plan_index_options(
5929 scx: &StatementContext,
5930 with_opts: Vec<IndexOption<Aug>>,
5931) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5932 if !with_opts.is_empty() {
5933 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5935 }
5936
5937 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5938
5939 let mut out = Vec::with_capacity(1);
5940 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5941 out.push(crate::plan::IndexOption::RetainHistory(cw));
5942 }
5943 Ok(out)
5944}
5945
5946generate_extracted_config!(
5947 TableOption,
5948 (PartitionBy, Vec<Ident>),
5949 (RetainHistory, OptionalDuration),
5950 (RedactedTest, String)
5951);
5952
5953fn plan_table_options(
5954 scx: &StatementContext,
5955 desc: &RelationDesc,
5956 with_opts: Vec<TableOption<Aug>>,
5957) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5958 let TableOptionExtracted {
5959 partition_by,
5960 retain_history,
5961 redacted_test,
5962 ..
5963 }: TableOptionExtracted = with_opts.try_into()?;
5964
5965 if let Some(partition_by) = partition_by {
5966 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5967 check_partition_by(desc, partition_by)?;
5968 }
5969
5970 if redacted_test.is_some() {
5971 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5972 }
5973
5974 let mut out = Vec::with_capacity(1);
5975 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5976 out.push(crate::plan::TableOption::RetainHistory(cw));
5977 }
5978 Ok(out)
5979}
5980
5981pub fn plan_alter_index_options(
5982 scx: &mut StatementContext,
5983 AlterIndexStatement {
5984 index_name,
5985 if_exists,
5986 action,
5987 }: AlterIndexStatement<Aug>,
5988) -> Result<Plan, PlanError> {
5989 let object_type = ObjectType::Index;
5990 match action {
5991 AlterIndexAction::ResetOptions(options) => {
5992 let mut options = options.into_iter();
5993 if let Some(opt) = options.next() {
5994 match opt {
5995 IndexOptionName::RetainHistory => {
5996 if options.next().is_some() {
5997 sql_bail!("RETAIN HISTORY must be only option");
5998 }
5999 return alter_retain_history(
6000 scx,
6001 object_type,
6002 if_exists,
6003 UnresolvedObjectName::Item(index_name),
6004 None,
6005 );
6006 }
6007 }
6008 }
6009 sql_bail!("expected option");
6010 }
6011 AlterIndexAction::SetOptions(options) => {
6012 let mut options = options.into_iter();
6013 if let Some(opt) = options.next() {
6014 match opt.name {
6015 IndexOptionName::RetainHistory => {
6016 if options.next().is_some() {
6017 sql_bail!("RETAIN HISTORY must be only option");
6018 }
6019 return alter_retain_history(
6020 scx,
6021 object_type,
6022 if_exists,
6023 UnresolvedObjectName::Item(index_name),
6024 opt.value,
6025 );
6026 }
6027 }
6028 }
6029 sql_bail!("expected option");
6030 }
6031 }
6032}
6033
6034pub fn describe_alter_cluster_set_options(
6035 _: &StatementContext,
6036 _: AlterClusterStatement<Aug>,
6037) -> Result<StatementDesc, PlanError> {
6038 Ok(StatementDesc::new(None))
6039}
6040
6041pub fn plan_alter_cluster(
6042 scx: &mut StatementContext,
6043 AlterClusterStatement {
6044 name,
6045 action,
6046 if_exists,
6047 }: AlterClusterStatement<Aug>,
6048) -> Result<Plan, PlanError> {
6049 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6050 Some(entry) => entry,
6051 None => {
6052 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6053 name: name.to_ast_string_simple(),
6054 object_type: ObjectType::Cluster,
6055 });
6056
6057 return Ok(Plan::AlterNoop(AlterNoopPlan {
6058 object_type: ObjectType::Cluster,
6059 }));
6060 }
6061 };
6062
6063 let mut options: PlanClusterOption = Default::default();
6064 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6065
6066 match action {
6067 AlterClusterAction::SetOptions {
6068 options: set_options,
6069 with_options,
6070 } => {
6071 let ClusterOptionExtracted {
6072 availability_zones,
6073 introspection_debugging,
6074 introspection_interval,
6075 managed,
6076 replicas: replica_defs,
6077 replication_factor,
6078 seen: _,
6079 size,
6080 disk,
6081 schedule,
6082 workload_class,
6083 }: ClusterOptionExtracted = set_options.try_into()?;
6084
6085 if !scx.catalog.active_role_id().is_system() {
6086 if workload_class.is_some() {
6087 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6088 }
6089 }
6090
6091 match managed.unwrap_or_else(|| cluster.is_managed()) {
6092 true => {
6093 let alter_strategy_extracted =
6094 ClusterAlterOptionExtracted::try_from(with_options)?;
6095 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6096
6097 match alter_strategy {
6098 AlterClusterPlanStrategy::None => {}
6099 _ => {
6100 scx.require_feature_flag(
6101 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6102 )?;
6103 }
6104 }
6105
6106 if replica_defs.is_some() {
6107 sql_bail!("REPLICAS not supported for managed clusters");
6108 }
6109 if schedule.is_some()
6110 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6111 {
6112 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6113 }
6114
6115 if let Some(replication_factor) = replication_factor {
6116 if schedule.is_some()
6117 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6118 {
6119 sql_bail!(
6120 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6121 );
6122 }
6123 if let Some(current_schedule) = cluster.schedule() {
6124 if !matches!(current_schedule, ClusterSchedule::Manual) {
6125 sql_bail!(
6126 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6127 );
6128 }
6129 }
6130
6131 let internal_replica_count =
6132 cluster.replicas().iter().filter(|r| r.internal()).count();
6133 let hypothetical_replica_count =
6134 internal_replica_count + usize::cast_from(replication_factor);
6135
6136 if contains_single_replica_objects(scx, cluster)
6139 && hypothetical_replica_count > 1
6140 {
6141 return Err(PlanError::CreateReplicaFailStorageObjects {
6142 current_replica_count: cluster.replica_ids().iter().count(),
6143 internal_replica_count,
6144 hypothetical_replica_count,
6145 });
6146 }
6147 } else if alter_strategy.is_some() {
6148 let internal_replica_count =
6152 cluster.replicas().iter().filter(|r| r.internal()).count();
6153 let hypothetical_replica_count = internal_replica_count * 2;
6154 if contains_single_replica_objects(scx, cluster) {
6155 return Err(PlanError::CreateReplicaFailStorageObjects {
6156 current_replica_count: cluster.replica_ids().iter().count(),
6157 internal_replica_count,
6158 hypothetical_replica_count,
6159 });
6160 }
6161 }
6162 }
6163 false => {
6164 if !alter_strategy.is_none() {
6165 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6166 }
6167 if availability_zones.is_some() {
6168 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6169 }
6170 if replication_factor.is_some() {
6171 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6172 }
6173 if introspection_debugging.is_some() {
6174 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6175 }
6176 if introspection_interval.is_some() {
6177 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6178 }
6179 if size.is_some() {
6180 sql_bail!("SIZE not supported for unmanaged clusters");
6181 }
6182 if disk.is_some() {
6183 sql_bail!("DISK not supported for unmanaged clusters");
6184 }
6185 if schedule.is_some()
6186 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6187 {
6188 sql_bail!(
6189 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6190 );
6191 }
6192 if let Some(current_schedule) = cluster.schedule() {
6193 if !matches!(current_schedule, ClusterSchedule::Manual)
6194 && schedule.is_none()
6195 {
6196 sql_bail!(
6197 "when switching a cluster to unmanaged, if the managed \
6198 cluster's SCHEDULE is anything other than MANUAL, you have to \
6199 explicitly set the SCHEDULE to MANUAL"
6200 );
6201 }
6202 }
6203 }
6204 }
6205
6206 let mut replicas = vec![];
6207 for ReplicaDefinition { name, options } in
6208 replica_defs.into_iter().flat_map(Vec::into_iter)
6209 {
6210 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6211 }
6212
6213 if let Some(managed) = managed {
6214 options.managed = AlterOptionParameter::Set(managed);
6215 }
6216 if let Some(replication_factor) = replication_factor {
6217 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6218 }
6219 if let Some(size) = &size {
6220 options.size = AlterOptionParameter::Set(size.clone());
6221 }
6222 if let Some(availability_zones) = availability_zones {
6223 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6224 }
6225 if let Some(introspection_debugging) = introspection_debugging {
6226 options.introspection_debugging =
6227 AlterOptionParameter::Set(introspection_debugging);
6228 }
6229 if let Some(introspection_interval) = introspection_interval {
6230 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6231 }
6232 if disk.is_some() {
6233 let size = size.as_deref().unwrap_or_else(|| {
6237 cluster.managed_size().expect("cluster known to be managed")
6238 });
6239 if scx.catalog.is_cluster_size_cc(size) {
6240 sql_bail!(
6241 "DISK option not supported for modern cluster sizes because disk is always enabled"
6242 );
6243 }
6244
6245 scx.catalog
6246 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6247 }
6248 if !replicas.is_empty() {
6249 options.replicas = AlterOptionParameter::Set(replicas);
6250 }
6251 if let Some(schedule) = schedule {
6252 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6253 }
6254 if let Some(workload_class) = workload_class {
6255 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6256 }
6257 }
6258 AlterClusterAction::ResetOptions(reset_options) => {
6259 use AlterOptionParameter::Reset;
6260 use ClusterOptionName::*;
6261
6262 if !scx.catalog.active_role_id().is_system() {
6263 if reset_options.contains(&WorkloadClass) {
6264 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6265 }
6266 }
6267
6268 for option in reset_options {
6269 match option {
6270 AvailabilityZones => options.availability_zones = Reset,
6271 Disk => scx
6272 .catalog
6273 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6274 IntrospectionInterval => options.introspection_interval = Reset,
6275 IntrospectionDebugging => options.introspection_debugging = Reset,
6276 Managed => options.managed = Reset,
6277 Replicas => options.replicas = Reset,
6278 ReplicationFactor => options.replication_factor = Reset,
6279 Size => options.size = Reset,
6280 Schedule => options.schedule = Reset,
6281 WorkloadClass => options.workload_class = Reset,
6282 }
6283 }
6284 }
6285 }
6286 Ok(Plan::AlterCluster(AlterClusterPlan {
6287 id: cluster.id(),
6288 name: cluster.name().to_string(),
6289 options,
6290 strategy: alter_strategy,
6291 }))
6292}
6293
6294pub fn describe_alter_set_cluster(
6295 _: &StatementContext,
6296 _: AlterSetClusterStatement<Aug>,
6297) -> Result<StatementDesc, PlanError> {
6298 Ok(StatementDesc::new(None))
6299}
6300
6301pub fn plan_alter_item_set_cluster(
6302 scx: &StatementContext,
6303 AlterSetClusterStatement {
6304 if_exists,
6305 set_cluster: in_cluster_name,
6306 name,
6307 object_type,
6308 }: AlterSetClusterStatement<Aug>,
6309) -> Result<Plan, PlanError> {
6310 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6311
6312 let object_type = object_type.into();
6313
6314 match object_type {
6316 ObjectType::MaterializedView => {}
6317 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6318 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6319 }
6320 _ => {
6321 bail_never_supported!(
6322 format!("ALTER {object_type} SET CLUSTER"),
6323 "sql/alter-set-cluster/",
6324 format!("{object_type} has no associated cluster")
6325 )
6326 }
6327 }
6328
6329 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6330
6331 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6332 Some(entry) => {
6333 let current_cluster = entry.cluster_id();
6334 let Some(current_cluster) = current_cluster else {
6335 sql_bail!("No cluster associated with {name}");
6336 };
6337
6338 if current_cluster == in_cluster.id() {
6339 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6340 } else {
6341 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6342 id: entry.id(),
6343 set_cluster: in_cluster.id(),
6344 }))
6345 }
6346 }
6347 None => {
6348 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6349 name: name.to_ast_string_simple(),
6350 object_type,
6351 });
6352
6353 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6354 }
6355 }
6356}
6357
6358pub fn describe_alter_object_rename(
6359 _: &StatementContext,
6360 _: AlterObjectRenameStatement,
6361) -> Result<StatementDesc, PlanError> {
6362 Ok(StatementDesc::new(None))
6363}
6364
6365pub fn plan_alter_object_rename(
6366 scx: &mut StatementContext,
6367 AlterObjectRenameStatement {
6368 name,
6369 object_type,
6370 to_item_name,
6371 if_exists,
6372 }: AlterObjectRenameStatement,
6373) -> Result<Plan, PlanError> {
6374 let object_type = object_type.into();
6375 match (object_type, name) {
6376 (
6377 ObjectType::View
6378 | ObjectType::MaterializedView
6379 | ObjectType::Table
6380 | ObjectType::Source
6381 | ObjectType::Index
6382 | ObjectType::Sink
6383 | ObjectType::Secret
6384 | ObjectType::Connection,
6385 UnresolvedObjectName::Item(name),
6386 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6387 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6388 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6389 }
6390 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6391 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6392 }
6393 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6394 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6395 }
6396 (object_type, name) => {
6397 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6398 }
6399 }
6400}
6401
6402pub fn plan_alter_schema_rename(
6403 scx: &mut StatementContext,
6404 name: UnresolvedSchemaName,
6405 to_schema_name: Ident,
6406 if_exists: bool,
6407) -> Result<Plan, PlanError> {
6408 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6409 let object_type = ObjectType::Schema;
6410 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6411 name: name.to_ast_string_simple(),
6412 object_type,
6413 });
6414 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6415 };
6416
6417 if scx
6419 .resolve_schema_in_database(&db_spec, &to_schema_name)
6420 .is_ok()
6421 {
6422 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6423 to_schema_name.clone().into_string(),
6424 )));
6425 }
6426
6427 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6429 if schema.id().is_system() {
6430 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6431 }
6432
6433 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6434 cur_schema_spec: (db_spec, schema_spec),
6435 new_schema_name: to_schema_name.into_string(),
6436 }))
6437}
6438
6439pub fn plan_alter_schema_swap<F>(
6440 scx: &mut StatementContext,
6441 name_a: UnresolvedSchemaName,
6442 name_b: Ident,
6443 gen_temp_suffix: F,
6444) -> Result<Plan, PlanError>
6445where
6446 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6447{
6448 let schema_a = scx.resolve_schema(name_a.clone())?;
6449
6450 let db_spec = schema_a.database().clone();
6451 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6452 sql_bail!("cannot swap schemas that are in the ambient database");
6453 };
6454 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6455
6456 if schema_a.id().is_system() || schema_b.id().is_system() {
6458 bail_never_supported!("swapping a system schema".to_string())
6459 }
6460
6461 let check = |temp_suffix: &str| {
6465 let mut temp_name = ident!("mz_schema_swap_");
6466 temp_name.append_lossy(temp_suffix);
6467 scx.resolve_schema_in_database(&db_spec, &temp_name)
6468 .is_err()
6469 };
6470 let temp_suffix = gen_temp_suffix(&check)?;
6471 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6472
6473 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6474 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6475 schema_a_name: schema_a.name().schema.to_string(),
6476 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6477 schema_b_name: schema_b.name().schema.to_string(),
6478 name_temp,
6479 }))
6480}
6481
6482pub fn plan_alter_item_rename(
6483 scx: &mut StatementContext,
6484 object_type: ObjectType,
6485 name: UnresolvedItemName,
6486 to_item_name: Ident,
6487 if_exists: bool,
6488) -> Result<Plan, PlanError> {
6489 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6490 Ok(r) => r,
6491 Err(PlanError::MismatchedObjectType {
6493 name,
6494 is_type: ObjectType::MaterializedView,
6495 expected_type: ObjectType::View,
6496 }) => {
6497 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6498 }
6499 e => e?,
6500 };
6501
6502 match resolved {
6503 Some(entry) => {
6504 let full_name = scx.catalog.resolve_full_name(entry.name());
6505 let item_type = entry.item_type();
6506
6507 let proposed_name = QualifiedItemName {
6508 qualifiers: entry.name().qualifiers.clone(),
6509 item: to_item_name.clone().into_string(),
6510 };
6511
6512 let conflicting_type_exists;
6516 let conflicting_item_exists;
6517 if item_type == CatalogItemType::Type {
6518 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6519 conflicting_item_exists = scx
6520 .catalog
6521 .get_item_by_name(&proposed_name)
6522 .map(|item| item.item_type().conflicts_with_type())
6523 .unwrap_or(false);
6524 } else {
6525 conflicting_type_exists = item_type.conflicts_with_type()
6526 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6527 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6528 };
6529 if conflicting_type_exists || conflicting_item_exists {
6530 sql_bail!("catalog item '{}' already exists", to_item_name);
6531 }
6532
6533 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6534 id: entry.id(),
6535 current_full_name: full_name,
6536 to_name: normalize::ident(to_item_name),
6537 object_type,
6538 }))
6539 }
6540 None => {
6541 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6542 name: name.to_ast_string_simple(),
6543 object_type,
6544 });
6545
6546 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6547 }
6548 }
6549}
6550
6551pub fn plan_alter_cluster_rename(
6552 scx: &mut StatementContext,
6553 object_type: ObjectType,
6554 name: Ident,
6555 to_name: Ident,
6556 if_exists: bool,
6557) -> Result<Plan, PlanError> {
6558 match resolve_cluster(scx, &name, if_exists)? {
6559 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6560 id: entry.id(),
6561 name: entry.name().to_string(),
6562 to_name: ident(to_name),
6563 })),
6564 None => {
6565 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6566 name: name.to_ast_string_simple(),
6567 object_type,
6568 });
6569
6570 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6571 }
6572 }
6573}
6574
6575pub fn plan_alter_cluster_swap<F>(
6576 scx: &mut StatementContext,
6577 name_a: Ident,
6578 name_b: Ident,
6579 gen_temp_suffix: F,
6580) -> Result<Plan, PlanError>
6581where
6582 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6583{
6584 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6585 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6586
6587 let check = |temp_suffix: &str| {
6588 let mut temp_name = ident!("mz_schema_swap_");
6589 temp_name.append_lossy(temp_suffix);
6590 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6591 Err(CatalogError::UnknownCluster(_)) => true,
6593 Ok(_) | Err(_) => false,
6595 }
6596 };
6597 let temp_suffix = gen_temp_suffix(&check)?;
6598 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6599
6600 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6601 id_a: cluster_a.id(),
6602 id_b: cluster_b.id(),
6603 name_a: name_a.into_string(),
6604 name_b: name_b.into_string(),
6605 name_temp,
6606 }))
6607}
6608
6609pub fn plan_alter_cluster_replica_rename(
6610 scx: &mut StatementContext,
6611 object_type: ObjectType,
6612 name: QualifiedReplica,
6613 to_item_name: Ident,
6614 if_exists: bool,
6615) -> Result<Plan, PlanError> {
6616 match resolve_cluster_replica(scx, &name, if_exists)? {
6617 Some((cluster, replica)) => {
6618 ensure_cluster_is_not_managed(scx, cluster.id())?;
6619 Ok(Plan::AlterClusterReplicaRename(
6620 AlterClusterReplicaRenamePlan {
6621 cluster_id: cluster.id(),
6622 replica_id: replica,
6623 name: QualifiedReplica {
6624 cluster: Ident::new(cluster.name())?,
6625 replica: name.replica,
6626 },
6627 to_name: normalize::ident(to_item_name),
6628 },
6629 ))
6630 }
6631 None => {
6632 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6633 name: name.to_ast_string_simple(),
6634 object_type,
6635 });
6636
6637 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6638 }
6639 }
6640}
6641
6642pub fn describe_alter_object_swap(
6643 _: &StatementContext,
6644 _: AlterObjectSwapStatement,
6645) -> Result<StatementDesc, PlanError> {
6646 Ok(StatementDesc::new(None))
6647}
6648
6649pub fn plan_alter_object_swap(
6650 scx: &mut StatementContext,
6651 stmt: AlterObjectSwapStatement,
6652) -> Result<Plan, PlanError> {
6653 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6654
6655 let AlterObjectSwapStatement {
6656 object_type,
6657 name_a,
6658 name_b,
6659 } = stmt;
6660 let object_type = object_type.into();
6661
6662 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6664 let mut attempts = 0;
6665 let name_temp = loop {
6666 attempts += 1;
6667 if attempts > 10 {
6668 tracing::warn!("Unable to generate temp id for swapping");
6669 sql_bail!("unable to swap!");
6670 }
6671
6672 let short_id = mz_ore::id_gen::temp_id();
6674 if check_fn(&short_id) {
6675 break short_id;
6676 }
6677 };
6678
6679 Ok(name_temp)
6680 };
6681
6682 match (object_type, name_a, name_b) {
6683 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6684 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6685 }
6686 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6687 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6688 }
6689 (object_type, _, _) => Err(PlanError::Unsupported {
6690 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6691 discussion_no: None,
6692 }),
6693 }
6694}
6695
6696pub fn describe_alter_retain_history(
6697 _: &StatementContext,
6698 _: AlterRetainHistoryStatement<Aug>,
6699) -> Result<StatementDesc, PlanError> {
6700 Ok(StatementDesc::new(None))
6701}
6702
6703pub fn plan_alter_retain_history(
6704 scx: &StatementContext,
6705 AlterRetainHistoryStatement {
6706 object_type,
6707 if_exists,
6708 name,
6709 history,
6710 }: AlterRetainHistoryStatement<Aug>,
6711) -> Result<Plan, PlanError> {
6712 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6713}
6714
6715fn alter_retain_history(
6716 scx: &StatementContext,
6717 object_type: ObjectType,
6718 if_exists: bool,
6719 name: UnresolvedObjectName,
6720 history: Option<WithOptionValue<Aug>>,
6721) -> Result<Plan, PlanError> {
6722 let name = match (object_type, name) {
6723 (
6724 ObjectType::View
6726 | ObjectType::MaterializedView
6727 | ObjectType::Table
6728 | ObjectType::Source
6729 | ObjectType::Index,
6730 UnresolvedObjectName::Item(name),
6731 ) => name,
6732 (object_type, _) => {
6733 sql_bail!("{object_type} does not support RETAIN HISTORY")
6734 }
6735 };
6736 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6737 Some(entry) => {
6738 let full_name = scx.catalog.resolve_full_name(entry.name());
6739 let item_type = entry.item_type();
6740
6741 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6743 return Err(PlanError::AlterViewOnMaterializedView(
6744 full_name.to_string(),
6745 ));
6746 } else if object_type == ObjectType::View {
6747 sql_bail!("{object_type} does not support RETAIN HISTORY")
6748 } else if object_type != item_type {
6749 sql_bail!(
6750 "\"{}\" is a {} not a {}",
6751 full_name,
6752 entry.item_type(),
6753 format!("{object_type}").to_lowercase()
6754 )
6755 }
6756
6757 let (value, lcw) = match &history {
6759 Some(WithOptionValue::RetainHistoryFor(value)) => {
6760 let window = OptionalDuration::try_from_value(value.clone())?;
6761 (Some(value.clone()), window.0)
6762 }
6763 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6765 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6766 };
6767 let window = plan_retain_history(scx, lcw)?;
6768
6769 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6770 id: entry.id(),
6771 value,
6772 window,
6773 object_type,
6774 }))
6775 }
6776 None => {
6777 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6778 name: name.to_ast_string_simple(),
6779 object_type,
6780 });
6781
6782 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6783 }
6784 }
6785}
6786
6787pub fn describe_alter_secret_options(
6788 _: &StatementContext,
6789 _: AlterSecretStatement<Aug>,
6790) -> Result<StatementDesc, PlanError> {
6791 Ok(StatementDesc::new(None))
6792}
6793
6794pub fn plan_alter_secret(
6795 scx: &mut StatementContext,
6796 stmt: AlterSecretStatement<Aug>,
6797) -> Result<Plan, PlanError> {
6798 let AlterSecretStatement {
6799 name,
6800 if_exists,
6801 value,
6802 } = stmt;
6803 let object_type = ObjectType::Secret;
6804 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6805 Some(entry) => entry.id(),
6806 None => {
6807 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6808 name: name.to_string(),
6809 object_type,
6810 });
6811
6812 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6813 }
6814 };
6815
6816 let secret_as = query::plan_secret_as(scx, value)?;
6817
6818 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6819}
6820
6821pub fn describe_alter_connection(
6822 _: &StatementContext,
6823 _: AlterConnectionStatement<Aug>,
6824) -> Result<StatementDesc, PlanError> {
6825 Ok(StatementDesc::new(None))
6826}
6827
6828generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6829
6830pub fn plan_alter_connection(
6831 scx: &StatementContext,
6832 stmt: AlterConnectionStatement<Aug>,
6833) -> Result<Plan, PlanError> {
6834 let AlterConnectionStatement {
6835 name,
6836 if_exists,
6837 actions,
6838 with_options,
6839 } = stmt;
6840 let conn_name = normalize::unresolved_item_name(name)?;
6841 let entry = match scx.catalog.resolve_item(&conn_name) {
6842 Ok(entry) => entry,
6843 Err(_) if if_exists => {
6844 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6845 name: conn_name.to_string(),
6846 object_type: ObjectType::Sink,
6847 });
6848
6849 return Ok(Plan::AlterNoop(AlterNoopPlan {
6850 object_type: ObjectType::Connection,
6851 }));
6852 }
6853 Err(e) => return Err(e.into()),
6854 };
6855
6856 let connection = entry.connection()?;
6857
6858 if actions
6859 .iter()
6860 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6861 {
6862 if actions.len() > 1 {
6863 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6864 }
6865
6866 if !with_options.is_empty() {
6867 sql_bail!(
6868 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6869 with_options
6870 .iter()
6871 .map(|o| o.to_ast_string_simple())
6872 .join(", ")
6873 );
6874 }
6875
6876 if !matches!(connection, Connection::Ssh(_)) {
6877 sql_bail!(
6878 "{} is not an SSH connection",
6879 scx.catalog.resolve_full_name(entry.name())
6880 )
6881 }
6882
6883 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6884 id: entry.id(),
6885 action: crate::plan::AlterConnectionAction::RotateKeys,
6886 }));
6887 }
6888
6889 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6890 if options.validate.is_some() {
6891 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6892 }
6893
6894 let validate = match options.validate {
6895 Some(val) => val,
6896 None => {
6897 scx.catalog
6898 .system_vars()
6899 .enable_default_connection_validation()
6900 && connection.validate_by_default()
6901 }
6902 };
6903
6904 let connection_type = match connection {
6905 Connection::Aws(_) => CreateConnectionType::Aws,
6906 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6907 Connection::Kafka(_) => CreateConnectionType::Kafka,
6908 Connection::Csr(_) => CreateConnectionType::Csr,
6909 Connection::Postgres(_) => CreateConnectionType::Postgres,
6910 Connection::Ssh(_) => CreateConnectionType::Ssh,
6911 Connection::MySql(_) => CreateConnectionType::MySql,
6912 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6913 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6914 };
6915
6916 let specified_options: BTreeSet<_> = actions
6918 .iter()
6919 .map(|action: &AlterConnectionAction<Aug>| match action {
6920 AlterConnectionAction::SetOption(option) => option.name.clone(),
6921 AlterConnectionAction::DropOption(name) => name.clone(),
6922 AlterConnectionAction::RotateKeys => unreachable!(),
6923 })
6924 .collect();
6925
6926 for invalid in INALTERABLE_OPTIONS {
6927 if specified_options.contains(invalid) {
6928 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6929 }
6930 }
6931
6932 connection::validate_options_per_connection_type(connection_type, specified_options)?;
6933
6934 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6936 actions.into_iter().partition_map(|action| match action {
6937 AlterConnectionAction::SetOption(option) => Either::Left(option),
6938 AlterConnectionAction::DropOption(name) => Either::Right(name),
6939 AlterConnectionAction::RotateKeys => unreachable!(),
6940 });
6941
6942 let set_options: BTreeMap<_, _> = set_options_vec
6943 .clone()
6944 .into_iter()
6945 .map(|option| (option.name, option.value))
6946 .collect();
6947
6948 let connection_options_extracted =
6952 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6953
6954 let duplicates: Vec<_> = connection_options_extracted
6955 .seen
6956 .intersection(&drop_options)
6957 .collect();
6958
6959 if !duplicates.is_empty() {
6960 sql_bail!(
6961 "cannot both SET and DROP/RESET options {}",
6962 duplicates
6963 .iter()
6964 .map(|option| option.to_string())
6965 .join(", ")
6966 )
6967 }
6968
6969 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6970 let set_options_count = mutually_exclusive_options
6971 .iter()
6972 .filter(|o| set_options.contains_key(o))
6973 .count();
6974 let drop_options_count = mutually_exclusive_options
6975 .iter()
6976 .filter(|o| drop_options.contains(o))
6977 .count();
6978
6979 if set_options_count > 0 && drop_options_count > 0 {
6981 sql_bail!(
6982 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6983 connection_type,
6984 mutually_exclusive_options
6985 .iter()
6986 .map(|option| option.to_string())
6987 .join(", ")
6988 )
6989 }
6990
6991 if set_options_count > 0 || drop_options_count > 0 {
6996 drop_options.extend(mutually_exclusive_options.iter().cloned());
6997 }
6998
6999 }
7002
7003 Ok(Plan::AlterConnection(AlterConnectionPlan {
7004 id: entry.id(),
7005 action: crate::plan::AlterConnectionAction::AlterOptions {
7006 set_options,
7007 drop_options,
7008 validate,
7009 },
7010 }))
7011}
7012
7013pub fn describe_alter_sink(
7014 _: &StatementContext,
7015 _: AlterSinkStatement<Aug>,
7016) -> Result<StatementDesc, PlanError> {
7017 Ok(StatementDesc::new(None))
7018}
7019
7020pub fn plan_alter_sink(
7021 scx: &mut StatementContext,
7022 stmt: AlterSinkStatement<Aug>,
7023) -> Result<Plan, PlanError> {
7024 let AlterSinkStatement {
7025 sink_name,
7026 if_exists,
7027 action,
7028 } = stmt;
7029
7030 let object_type = ObjectType::Sink;
7031 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7032
7033 let Some(item) = item else {
7034 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7035 name: sink_name.to_string(),
7036 object_type,
7037 });
7038
7039 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7040 };
7041 let item = item.at_version(RelationVersionSelector::Latest);
7043
7044 match action {
7045 AlterSinkAction::ChangeRelation(new_from) => {
7046 let create_sql = item.create_sql();
7048 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7049 let [stmt]: [StatementParseResult; 1] = stmts
7050 .try_into()
7051 .expect("create sql of sink was not exactly one statement");
7052 let Statement::CreateSink(stmt) = stmt.ast else {
7053 unreachable!("invalid create SQL for sink item");
7054 };
7055
7056 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7058 stmt.from = new_from;
7059
7060 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7062 unreachable!("invalid plan for CREATE SINK statement");
7063 };
7064
7065 plan.sink.version += 1;
7066
7067 Ok(Plan::AlterSink(AlterSinkPlan {
7068 item_id: item.id(),
7069 global_id: item.global_id(),
7070 sink: plan.sink,
7071 with_snapshot: plan.with_snapshot,
7072 in_cluster: plan.in_cluster,
7073 }))
7074 }
7075 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7076 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7077 }
7078}
7079
7080pub fn describe_alter_source(
7081 _: &StatementContext,
7082 _: AlterSourceStatement<Aug>,
7083) -> Result<StatementDesc, PlanError> {
7084 Ok(StatementDesc::new(None))
7086}
7087
7088generate_extracted_config!(
7089 AlterSourceAddSubsourceOption,
7090 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7091 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7092 (Details, String)
7093);
7094
7095pub fn plan_alter_source(
7096 scx: &mut StatementContext,
7097 stmt: AlterSourceStatement<Aug>,
7098) -> Result<Plan, PlanError> {
7099 let AlterSourceStatement {
7100 source_name,
7101 if_exists,
7102 action,
7103 } = stmt;
7104 let object_type = ObjectType::Source;
7105
7106 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7107 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7108 name: source_name.to_string(),
7109 object_type,
7110 });
7111
7112 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7113 }
7114
7115 match action {
7116 AlterSourceAction::SetOptions(options) => {
7117 let mut options = options.into_iter();
7118 let option = options.next().unwrap();
7119 if option.name == CreateSourceOptionName::RetainHistory {
7120 if options.next().is_some() {
7121 sql_bail!("RETAIN HISTORY must be only option");
7122 }
7123 return alter_retain_history(
7124 scx,
7125 object_type,
7126 if_exists,
7127 UnresolvedObjectName::Item(source_name),
7128 option.value,
7129 );
7130 }
7131 sql_bail!(
7134 "Cannot modify the {} of a SOURCE.",
7135 option.name.to_ast_string_simple()
7136 );
7137 }
7138 AlterSourceAction::ResetOptions(reset) => {
7139 let mut options = reset.into_iter();
7140 let option = options.next().unwrap();
7141 if option == CreateSourceOptionName::RetainHistory {
7142 if options.next().is_some() {
7143 sql_bail!("RETAIN HISTORY must be only option");
7144 }
7145 return alter_retain_history(
7146 scx,
7147 object_type,
7148 if_exists,
7149 UnresolvedObjectName::Item(source_name),
7150 None,
7151 );
7152 }
7153 sql_bail!(
7154 "Cannot modify the {} of a SOURCE.",
7155 option.to_ast_string_simple()
7156 );
7157 }
7158 AlterSourceAction::DropSubsources { .. } => {
7159 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7160 }
7161 AlterSourceAction::AddSubsources { .. } => {
7162 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7163 }
7164 AlterSourceAction::RefreshReferences => {
7165 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7166 }
7167 };
7168}
7169
7170pub fn describe_alter_system_set(
7171 _: &StatementContext,
7172 _: AlterSystemSetStatement,
7173) -> Result<StatementDesc, PlanError> {
7174 Ok(StatementDesc::new(None))
7175}
7176
7177pub fn plan_alter_system_set(
7178 _: &StatementContext,
7179 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7180) -> Result<Plan, PlanError> {
7181 let name = name.to_string();
7182 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7183 name,
7184 value: scl::plan_set_variable_to(to)?,
7185 }))
7186}
7187
7188pub fn describe_alter_system_reset(
7189 _: &StatementContext,
7190 _: AlterSystemResetStatement,
7191) -> Result<StatementDesc, PlanError> {
7192 Ok(StatementDesc::new(None))
7193}
7194
7195pub fn plan_alter_system_reset(
7196 _: &StatementContext,
7197 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7198) -> Result<Plan, PlanError> {
7199 let name = name.to_string();
7200 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7201}
7202
7203pub fn describe_alter_system_reset_all(
7204 _: &StatementContext,
7205 _: AlterSystemResetAllStatement,
7206) -> Result<StatementDesc, PlanError> {
7207 Ok(StatementDesc::new(None))
7208}
7209
7210pub fn plan_alter_system_reset_all(
7211 _: &StatementContext,
7212 _: AlterSystemResetAllStatement,
7213) -> Result<Plan, PlanError> {
7214 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7215}
7216
7217pub fn describe_alter_role(
7218 _: &StatementContext,
7219 _: AlterRoleStatement<Aug>,
7220) -> Result<StatementDesc, PlanError> {
7221 Ok(StatementDesc::new(None))
7222}
7223
7224pub fn plan_alter_role(
7225 scx: &StatementContext,
7226 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7227) -> Result<Plan, PlanError> {
7228 let option = match option {
7229 AlterRoleOption::Attributes(attrs) => {
7230 let attrs = plan_role_attributes(attrs, scx)?;
7231 PlannedAlterRoleOption::Attributes(attrs)
7232 }
7233 AlterRoleOption::Variable(variable) => {
7234 let var = plan_role_variable(variable)?;
7235 PlannedAlterRoleOption::Variable(var)
7236 }
7237 };
7238
7239 Ok(Plan::AlterRole(AlterRolePlan {
7240 id: name.id,
7241 name: name.name,
7242 option,
7243 }))
7244}
7245
7246pub fn describe_alter_table_add_column(
7247 _: &StatementContext,
7248 _: AlterTableAddColumnStatement<Aug>,
7249) -> Result<StatementDesc, PlanError> {
7250 Ok(StatementDesc::new(None))
7251}
7252
7253pub fn plan_alter_table_add_column(
7254 scx: &StatementContext,
7255 stmt: AlterTableAddColumnStatement<Aug>,
7256) -> Result<Plan, PlanError> {
7257 let AlterTableAddColumnStatement {
7258 if_exists,
7259 name,
7260 if_col_not_exist,
7261 column_name,
7262 data_type,
7263 } = stmt;
7264 let object_type = ObjectType::Table;
7265
7266 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7267
7268 let (relation_id, item_name, desc) =
7269 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7270 Some(item) => {
7271 let item_name = scx.catalog.resolve_full_name(item.name());
7273 let item = item.at_version(RelationVersionSelector::Latest);
7274 let desc = item.desc(&item_name)?.into_owned();
7275 (item.id(), item_name, desc)
7276 }
7277 None => {
7278 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7279 name: name.to_ast_string_simple(),
7280 object_type,
7281 });
7282 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7283 }
7284 };
7285
7286 let column_name = ColumnName::from(column_name.as_str());
7287 if desc.get_by_name(&column_name).is_some() {
7288 if if_col_not_exist {
7289 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7290 column_name: column_name.to_string(),
7291 object_name: item_name.item,
7292 });
7293 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7294 } else {
7295 return Err(PlanError::ColumnAlreadyExists {
7296 column_name,
7297 object_name: item_name.item,
7298 });
7299 }
7300 }
7301
7302 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7303 let column_type = scalar_type.nullable(true);
7305 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7307
7308 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7309 relation_id,
7310 column_name,
7311 column_type,
7312 raw_sql_type,
7313 }))
7314}
7315
7316pub fn describe_comment(
7317 _: &StatementContext,
7318 _: CommentStatement<Aug>,
7319) -> Result<StatementDesc, PlanError> {
7320 Ok(StatementDesc::new(None))
7321}
7322
7323pub fn plan_comment(
7324 scx: &mut StatementContext,
7325 stmt: CommentStatement<Aug>,
7326) -> Result<Plan, PlanError> {
7327 const MAX_COMMENT_LENGTH: usize = 1024;
7328
7329 let CommentStatement { object, comment } = stmt;
7330
7331 if let Some(c) = &comment {
7333 if c.len() > 1024 {
7334 return Err(PlanError::CommentTooLong {
7335 length: c.len(),
7336 max_size: MAX_COMMENT_LENGTH,
7337 });
7338 }
7339 }
7340
7341 let (object_id, column_pos) = match &object {
7342 com_ty @ CommentObjectType::Table { name }
7343 | com_ty @ CommentObjectType::View { name }
7344 | com_ty @ CommentObjectType::MaterializedView { name }
7345 | com_ty @ CommentObjectType::Index { name }
7346 | com_ty @ CommentObjectType::Func { name }
7347 | com_ty @ CommentObjectType::Connection { name }
7348 | com_ty @ CommentObjectType::Source { name }
7349 | com_ty @ CommentObjectType::Sink { name }
7350 | com_ty @ CommentObjectType::Secret { name }
7351 | com_ty @ CommentObjectType::ContinualTask { name } => {
7352 let item = scx.get_item_by_resolved_name(name)?;
7353 match (com_ty, item.item_type()) {
7354 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7355 (CommentObjectId::Table(item.id()), None)
7356 }
7357 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7358 (CommentObjectId::View(item.id()), None)
7359 }
7360 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7361 (CommentObjectId::MaterializedView(item.id()), None)
7362 }
7363 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7364 (CommentObjectId::Index(item.id()), None)
7365 }
7366 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7367 (CommentObjectId::Func(item.id()), None)
7368 }
7369 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7370 (CommentObjectId::Connection(item.id()), None)
7371 }
7372 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7373 (CommentObjectId::Source(item.id()), None)
7374 }
7375 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7376 (CommentObjectId::Sink(item.id()), None)
7377 }
7378 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7379 (CommentObjectId::Secret(item.id()), None)
7380 }
7381 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7382 (CommentObjectId::ContinualTask(item.id()), None)
7383 }
7384 (com_ty, cat_ty) => {
7385 let expected_type = match com_ty {
7386 CommentObjectType::Table { .. } => ObjectType::Table,
7387 CommentObjectType::View { .. } => ObjectType::View,
7388 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7389 CommentObjectType::Index { .. } => ObjectType::Index,
7390 CommentObjectType::Func { .. } => ObjectType::Func,
7391 CommentObjectType::Connection { .. } => ObjectType::Connection,
7392 CommentObjectType::Source { .. } => ObjectType::Source,
7393 CommentObjectType::Sink { .. } => ObjectType::Sink,
7394 CommentObjectType::Secret { .. } => ObjectType::Secret,
7395 _ => unreachable!("these are the only types we match on"),
7396 };
7397
7398 return Err(PlanError::InvalidObjectType {
7399 expected_type: SystemObjectType::Object(expected_type),
7400 actual_type: SystemObjectType::Object(cat_ty.into()),
7401 object_name: item.name().item.clone(),
7402 });
7403 }
7404 }
7405 }
7406 CommentObjectType::Type { ty } => match ty {
7407 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7408 sql_bail!("cannot comment on anonymous list or map type");
7409 }
7410 ResolvedDataType::Named { id, modifiers, .. } => {
7411 if !modifiers.is_empty() {
7412 sql_bail!("cannot comment on type with modifiers");
7413 }
7414 (CommentObjectId::Type(*id), None)
7415 }
7416 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7417 },
7418 CommentObjectType::Column { name } => {
7419 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7420 match item.item_type() {
7421 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7422 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7423 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7424 CatalogItemType::MaterializedView => {
7425 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7426 }
7427 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7428 r => {
7429 return Err(PlanError::Unsupported {
7430 feature: format!("Specifying comments on a column of {r}"),
7431 discussion_no: None,
7432 });
7433 }
7434 }
7435 }
7436 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7437 CommentObjectType::Database { name } => {
7438 (CommentObjectId::Database(*name.database_id()), None)
7439 }
7440 CommentObjectType::Schema { name } => (
7441 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7442 None,
7443 ),
7444 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7445 CommentObjectType::ClusterReplica { name } => {
7446 let replica = scx.catalog.resolve_cluster_replica(name)?;
7447 (
7448 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7449 None,
7450 )
7451 }
7452 CommentObjectType::NetworkPolicy { name } => {
7453 (CommentObjectId::NetworkPolicy(name.id), None)
7454 }
7455 };
7456
7457 if let Some(p) = column_pos {
7463 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7464 max_num_columns: MAX_NUM_COLUMNS,
7465 req_num_columns: p,
7466 })?;
7467 }
7468
7469 Ok(Plan::Comment(CommentPlan {
7470 object_id,
7471 sub_component: column_pos,
7472 comment,
7473 }))
7474}
7475
7476pub(crate) fn resolve_cluster<'a>(
7477 scx: &'a StatementContext,
7478 name: &'a Ident,
7479 if_exists: bool,
7480) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7481 match scx.resolve_cluster(Some(name)) {
7482 Ok(cluster) => Ok(Some(cluster)),
7483 Err(_) if if_exists => Ok(None),
7484 Err(e) => Err(e),
7485 }
7486}
7487
7488pub(crate) fn resolve_cluster_replica<'a>(
7489 scx: &'a StatementContext,
7490 name: &QualifiedReplica,
7491 if_exists: bool,
7492) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7493 match scx.resolve_cluster(Some(&name.cluster)) {
7494 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7495 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7496 None if if_exists => Ok(None),
7497 None => Err(sql_err!(
7498 "CLUSTER {} has no CLUSTER REPLICA named {}",
7499 cluster.name(),
7500 name.replica.as_str().quoted(),
7501 )),
7502 },
7503 Err(_) if if_exists => Ok(None),
7504 Err(e) => Err(e),
7505 }
7506}
7507
7508pub(crate) fn resolve_database<'a>(
7509 scx: &'a StatementContext,
7510 name: &'a UnresolvedDatabaseName,
7511 if_exists: bool,
7512) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7513 match scx.resolve_database(name) {
7514 Ok(database) => Ok(Some(database)),
7515 Err(_) if if_exists => Ok(None),
7516 Err(e) => Err(e),
7517 }
7518}
7519
7520pub(crate) fn resolve_schema<'a>(
7521 scx: &'a StatementContext,
7522 name: UnresolvedSchemaName,
7523 if_exists: bool,
7524) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7525 match scx.resolve_schema(name) {
7526 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7527 Err(_) if if_exists => Ok(None),
7528 Err(e) => Err(e),
7529 }
7530}
7531
7532pub(crate) fn resolve_network_policy<'a>(
7533 scx: &'a StatementContext,
7534 name: Ident,
7535 if_exists: bool,
7536) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7537 match scx.catalog.resolve_network_policy(&name.to_string()) {
7538 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7539 id: policy.id(),
7540 name: policy.name().to_string(),
7541 })),
7542 Err(_) if if_exists => Ok(None),
7543 Err(e) => Err(e.into()),
7544 }
7545}
7546
7547pub(crate) fn resolve_item_or_type<'a>(
7548 scx: &'a StatementContext,
7549 object_type: ObjectType,
7550 name: UnresolvedItemName,
7551 if_exists: bool,
7552) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7553 let name = normalize::unresolved_item_name(name)?;
7554 let catalog_item = match object_type {
7555 ObjectType::Type => scx.catalog.resolve_type(&name),
7556 _ => scx.catalog.resolve_item(&name),
7557 };
7558
7559 match catalog_item {
7560 Ok(item) => {
7561 let is_type = ObjectType::from(item.item_type());
7562 if object_type == is_type {
7563 Ok(Some(item))
7564 } else {
7565 Err(PlanError::MismatchedObjectType {
7566 name: scx.catalog.minimal_qualification(item.name()),
7567 is_type,
7568 expected_type: object_type,
7569 })
7570 }
7571 }
7572 Err(_) if if_exists => Ok(None),
7573 Err(e) => Err(e.into()),
7574 }
7575}
7576
7577fn ensure_cluster_is_not_managed(
7579 scx: &StatementContext,
7580 cluster_id: ClusterId,
7581) -> Result<(), PlanError> {
7582 let cluster = scx.catalog.get_cluster(cluster_id);
7583 if cluster.is_managed() {
7584 Err(PlanError::ManagedCluster {
7585 cluster_name: cluster.name().to_string(),
7586 })
7587 } else {
7588 Ok(())
7589 }
7590}