1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_proto::RustType;
33use mz_repr::adt::interval::Interval;
34use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
35use mz_repr::network_policy_id::NetworkPolicyId;
36use mz_repr::optimize::OptimizerFeatureOverrides;
37use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
38use mz_repr::role_id::RoleId;
39use mz_repr::{
40 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
41 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
42 preserves_order, strconv,
43};
44use mz_sql_parser::ast::{
45 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
46 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
47 AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
48 AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
49 AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
50 AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
51 AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
52 AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
53 ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
54 ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
55 ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
56 ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
57 ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
58 CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
59 CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
60 CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
61 CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
62 CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
63 CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
64 CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
65 CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
66 CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
67 CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
68 CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
69 CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
70 DropOwnedStatement, Expr, Format, FormatSpecifier, IcebergSinkConfigOption, Ident,
71 IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint,
72 LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption,
73 MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, NetworkPolicyOption,
74 NetworkPolicyOptionName, NetworkPolicyRuleDefinition, NetworkPolicyRuleOption,
75 NetworkPolicyRuleOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema,
76 QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue,
77 ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar,
78 SourceErrorPolicy, SourceIncludeMetadata, SqlServerConfigOption, SqlServerConfigOptionName,
79 Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption,
80 TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName,
81 UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
82 WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
90 SinkEnvelope, StorageSinkConnection,
91};
92use mz_storage_types::sources::encoding::{
93 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94 SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110 PostgresSourceConnection, PostgresSourcePublicationDetails,
111 ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
120 SqlServerSourceExtras, Timeline,
121};
122use prost::Message;
123
124use crate::ast::display::AstDisplay;
125use crate::catalog::{
126 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
127 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
128};
129use crate::iceberg::IcebergSinkConfigOptionExtracted;
130use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
131use crate::names::{
132 Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
133 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
134 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
135};
136use crate::normalize::{self, ident};
137use crate::plan::error::PlanError;
138use crate::plan::query::{
139 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
140 scalar_type_from_sql,
141};
142use crate::plan::scope::Scope;
143use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
144use crate::plan::statement::{StatementContext, StatementDesc, scl};
145use crate::plan::typeconv::CastContext;
146use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
147use crate::plan::{
148 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
149 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
150 AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
151 AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
152 AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
153 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
154 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
155 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
156 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
157 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
158 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
159 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
160 MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
161 PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
162 Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
163 WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
164 transform_ast,
165};
166use crate::session::vars::{
167 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
168 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
169};
170use crate::{names, parse};
171
172mod connection;
173
174const MAX_NUM_COLUMNS: usize = 256;
179
180static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
181 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
182
183fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
187 if partition_by.len() > desc.len() {
188 tracing::error!(
189 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
190 desc.len(),
191 partition_by.len()
192 );
193 partition_by.truncate(desc.len());
194 }
195
196 let desc_prefix = desc.iter().take(partition_by.len());
197 for (idx, ((desc_name, desc_type), partition_name)) in
198 desc_prefix.zip_eq(partition_by).enumerate()
199 {
200 let partition_name = normalize::column_name(partition_name);
201 if *desc_name != partition_name {
202 sql_bail!(
203 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
204 );
205 }
206 if !preserves_order(&desc_type.scalar_type) {
207 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
208 }
209 }
210 Ok(())
211}
212
213pub fn describe_create_database(
214 _: &StatementContext,
215 _: CreateDatabaseStatement,
216) -> Result<StatementDesc, PlanError> {
217 Ok(StatementDesc::new(None))
218}
219
220pub fn plan_create_database(
221 _: &StatementContext,
222 CreateDatabaseStatement {
223 name,
224 if_not_exists,
225 }: CreateDatabaseStatement,
226) -> Result<Plan, PlanError> {
227 Ok(Plan::CreateDatabase(CreateDatabasePlan {
228 name: normalize::ident(name.0),
229 if_not_exists,
230 }))
231}
232
233pub fn describe_create_schema(
234 _: &StatementContext,
235 _: CreateSchemaStatement,
236) -> Result<StatementDesc, PlanError> {
237 Ok(StatementDesc::new(None))
238}
239
240pub fn plan_create_schema(
241 scx: &StatementContext,
242 CreateSchemaStatement {
243 mut name,
244 if_not_exists,
245 }: CreateSchemaStatement,
246) -> Result<Plan, PlanError> {
247 if name.0.len() > 2 {
248 sql_bail!("schema name {} has more than two components", name);
249 }
250 let schema_name = normalize::ident(
251 name.0
252 .pop()
253 .expect("names always have at least one component"),
254 );
255 let database_spec = match name.0.pop() {
256 None => match scx.catalog.active_database() {
257 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
258 None => sql_bail!("no database specified and no active database"),
259 },
260 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
261 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
262 Err(_) => sql_bail!("invalid database {}", n.as_str()),
263 },
264 };
265 Ok(Plan::CreateSchema(CreateSchemaPlan {
266 database_spec,
267 schema_name,
268 if_not_exists,
269 }))
270}
271
272pub fn describe_create_table(
273 _: &StatementContext,
274 _: CreateTableStatement<Aug>,
275) -> Result<StatementDesc, PlanError> {
276 Ok(StatementDesc::new(None))
277}
278
279pub fn plan_create_table(
280 scx: &StatementContext,
281 stmt: CreateTableStatement<Aug>,
282) -> Result<Plan, PlanError> {
283 let CreateTableStatement {
284 name,
285 columns,
286 constraints,
287 if_not_exists,
288 temporary,
289 with_options,
290 } = &stmt;
291
292 let names: Vec<_> = columns
293 .iter()
294 .filter(|c| {
295 let is_versioned = c
299 .options
300 .iter()
301 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
302 !is_versioned
303 })
304 .map(|c| normalize::column_name(c.name.clone()))
305 .collect();
306
307 if let Some(dup) = names.iter().duplicates().next() {
308 sql_bail!("column {} specified more than once", dup.quoted());
309 }
310
311 let mut column_types = Vec::with_capacity(columns.len());
314 let mut defaults = Vec::with_capacity(columns.len());
315 let mut changes = BTreeMap::new();
316 let mut keys = Vec::new();
317
318 for (i, c) in columns.into_iter().enumerate() {
319 let aug_data_type = &c.data_type;
320 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
321 let mut nullable = true;
322 let mut default = Expr::null();
323 let mut versioned = false;
324 for option in &c.options {
325 match &option.option {
326 ColumnOption::NotNull => nullable = false,
327 ColumnOption::Default(expr) => {
328 let mut expr = expr.clone();
331 transform_ast::transform(scx, &mut expr)?;
332 let _ = query::plan_default_expr(scx, &expr, &ty)?;
333 default = expr.clone();
334 }
335 ColumnOption::Unique { is_primary } => {
336 keys.push(vec![i]);
337 if *is_primary {
338 nullable = false;
339 }
340 }
341 ColumnOption::Versioned { action, version } => {
342 let version = RelationVersion::from(*version);
343 versioned = true;
344
345 let name = normalize::column_name(c.name.clone());
346 let typ = ty.clone().nullable(nullable);
347
348 changes.insert(version, (action.clone(), name, typ));
349 }
350 other => {
351 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
352 }
353 }
354 }
355 if !versioned {
358 column_types.push(ty.nullable(nullable));
359 }
360 defaults.push(default);
361 }
362
363 let mut seen_primary = false;
364 'c: for constraint in constraints {
365 match constraint {
366 TableConstraint::Unique {
367 name: _,
368 columns,
369 is_primary,
370 nulls_not_distinct,
371 } => {
372 if seen_primary && *is_primary {
373 sql_bail!(
374 "multiple primary keys for table {} are not allowed",
375 name.to_ast_string_stable()
376 );
377 }
378 seen_primary = *is_primary || seen_primary;
379
380 let mut key = vec![];
381 for column in columns {
382 let column = normalize::column_name(column.clone());
383 match names.iter().position(|name| *name == column) {
384 None => sql_bail!("unknown column in constraint: {}", column),
385 Some(i) => {
386 let nullable = &mut column_types[i].nullable;
387 if *is_primary {
388 if *nulls_not_distinct {
389 sql_bail!(
390 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
391 );
392 }
393
394 *nullable = false;
395 } else if !(*nulls_not_distinct || !*nullable) {
396 break 'c;
399 }
400
401 key.push(i);
402 }
403 }
404 }
405
406 if *is_primary {
407 keys.insert(0, key);
408 } else {
409 keys.push(key);
410 }
411 }
412 TableConstraint::ForeignKey { .. } => {
413 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
416 }
417 TableConstraint::Check { .. } => {
418 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
421 }
422 }
423 }
424
425 if !keys.is_empty() {
426 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
429 }
430
431 let typ = SqlRelationType::new(column_types).with_keys(keys);
432
433 let temporary = *temporary;
434 let name = if temporary {
435 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
436 } else {
437 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
438 };
439
440 let full_name = scx.catalog.resolve_full_name(&name);
442 let partial_name = PartialItemName::from(full_name.clone());
443 if let (false, Ok(item)) = (
446 if_not_exists,
447 scx.catalog.resolve_item_or_type(&partial_name),
448 ) {
449 return Err(PlanError::ItemAlreadyExists {
450 name: full_name.to_string(),
451 item_type: item.item_type(),
452 });
453 }
454
455 let desc = RelationDesc::new(typ, names);
456 let mut desc = VersionedRelationDesc::new(desc);
457 for (version, (_action, name, typ)) in changes.into_iter() {
458 let new_version = desc.add_column(name, typ);
459 if version != new_version {
460 return Err(PlanError::InvalidTable {
461 name: full_name.item,
462 });
463 }
464 }
465
466 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
467
468 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
474 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
475
476 let compaction_window = options.iter().find_map(|o| {
477 #[allow(irrefutable_let_patterns)]
478 if let crate::plan::TableOption::RetainHistory(lcw) = o {
479 Some(lcw.clone())
480 } else {
481 None
482 }
483 });
484
485 let table = Table {
486 create_sql,
487 desc,
488 temporary,
489 compaction_window,
490 data_source: TableDataSource::TableWrites { defaults },
491 };
492 Ok(Plan::CreateTable(CreateTablePlan {
493 name,
494 table,
495 if_not_exists: *if_not_exists,
496 }))
497}
498
499pub fn describe_create_table_from_source(
500 _: &StatementContext,
501 _: CreateTableFromSourceStatement<Aug>,
502) -> Result<StatementDesc, PlanError> {
503 Ok(StatementDesc::new(None))
504}
505
506pub fn describe_create_webhook_source(
507 _: &StatementContext,
508 _: CreateWebhookSourceStatement<Aug>,
509) -> Result<StatementDesc, PlanError> {
510 Ok(StatementDesc::new(None))
511}
512
513pub fn describe_create_source(
514 _: &StatementContext,
515 _: CreateSourceStatement<Aug>,
516) -> Result<StatementDesc, PlanError> {
517 Ok(StatementDesc::new(None))
518}
519
520pub fn describe_create_subsource(
521 _: &StatementContext,
522 _: CreateSubsourceStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524 Ok(StatementDesc::new(None))
525}
526
527generate_extracted_config!(
528 CreateSourceOption,
529 (TimestampInterval, Duration),
530 (RetainHistory, OptionalDuration)
531);
532
533generate_extracted_config!(
534 PgConfigOption,
535 (Details, String),
536 (Publication, String),
537 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
538 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
539);
540
541generate_extracted_config!(
542 MySqlConfigOption,
543 (Details, String),
544 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
545 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
546);
547
548generate_extracted_config!(
549 SqlServerConfigOption,
550 (Details, String),
551 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
552 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
553);
554
555pub fn plan_create_webhook_source(
556 scx: &StatementContext,
557 mut stmt: CreateWebhookSourceStatement<Aug>,
558) -> Result<Plan, PlanError> {
559 if stmt.is_table {
560 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
561 }
562
563 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
566 let enable_multi_replica_sources =
567 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
568 if !enable_multi_replica_sources {
569 if in_cluster.replica_ids().len() > 1 {
570 sql_bail!("cannot create webhook source in cluster with more than one replica")
571 }
572 }
573 let create_sql =
574 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
575
576 let CreateWebhookSourceStatement {
577 name,
578 if_not_exists,
579 body_format,
580 include_headers,
581 validate_using,
582 is_table,
583 in_cluster: _,
585 } = stmt;
586
587 let validate_using = validate_using
588 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
589 .transpose()?;
590 if let Some(WebhookValidation { expression, .. }) = &validate_using {
591 if !expression.contains_column() {
594 return Err(PlanError::WebhookValidationDoesNotUseColumns);
595 }
596 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
600 return Err(PlanError::WebhookValidationNonDeterministic);
601 }
602 }
603
604 let body_format = match body_format {
605 Format::Bytes => WebhookBodyFormat::Bytes,
606 Format::Json { array } => WebhookBodyFormat::Json { array },
607 Format::Text => WebhookBodyFormat::Text,
608 ty => {
610 return Err(PlanError::Unsupported {
611 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
612 discussion_no: None,
613 });
614 }
615 };
616
617 let mut column_ty = vec![
618 SqlColumnType {
620 scalar_type: SqlScalarType::from(body_format),
621 nullable: false,
622 },
623 ];
624 let mut column_names = vec!["body".to_string()];
625
626 let mut headers = WebhookHeaders::default();
627
628 if let Some(filters) = include_headers.column {
630 column_ty.push(SqlColumnType {
631 scalar_type: SqlScalarType::Map {
632 value_type: Box::new(SqlScalarType::String),
633 custom_id: None,
634 },
635 nullable: false,
636 });
637 column_names.push("headers".to_string());
638
639 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
640 filters.into_iter().partition_map(|filter| {
641 if filter.block {
642 itertools::Either::Right(filter.header_name)
643 } else {
644 itertools::Either::Left(filter.header_name)
645 }
646 });
647 headers.header_column = Some(WebhookHeaderFilters { allow, block });
648 }
649
650 for header in include_headers.mappings {
652 let scalar_type = header
653 .use_bytes
654 .then_some(SqlScalarType::Bytes)
655 .unwrap_or(SqlScalarType::String);
656 column_ty.push(SqlColumnType {
657 scalar_type,
658 nullable: true,
659 });
660 column_names.push(header.column_name.into_string());
661
662 let column_idx = column_ty.len() - 1;
663 assert_eq!(
665 column_idx,
666 column_names.len() - 1,
667 "header column names and types don't match"
668 );
669 headers
670 .mapped_headers
671 .insert(column_idx, (header.header_name, header.use_bytes));
672 }
673
674 let mut unique_check = HashSet::with_capacity(column_names.len());
676 for name in &column_names {
677 if !unique_check.insert(name) {
678 return Err(PlanError::AmbiguousColumn(name.clone().into()));
679 }
680 }
681 if column_names.len() > MAX_NUM_COLUMNS {
682 return Err(PlanError::TooManyColumns {
683 max_num_columns: MAX_NUM_COLUMNS,
684 req_num_columns: column_names.len(),
685 });
686 }
687
688 let typ = SqlRelationType::new(column_ty);
689 let desc = RelationDesc::new(typ, column_names);
690
691 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
693 let full_name = scx.catalog.resolve_full_name(&name);
694 let partial_name = PartialItemName::from(full_name.clone());
695 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
696 return Err(PlanError::ItemAlreadyExists {
697 name: full_name.to_string(),
698 item_type: item.item_type(),
699 });
700 }
701
702 let timeline = Timeline::EpochMilliseconds;
705
706 let plan = if is_table {
707 let data_source = DataSourceDesc::Webhook {
708 validate_using,
709 body_format,
710 headers,
711 cluster_id: Some(in_cluster.id()),
712 };
713 let data_source = TableDataSource::DataSource {
714 desc: data_source,
715 timeline,
716 };
717 Plan::CreateTable(CreateTablePlan {
718 name,
719 if_not_exists,
720 table: Table {
721 create_sql,
722 desc: VersionedRelationDesc::new(desc),
723 temporary: false,
724 compaction_window: None,
725 data_source,
726 },
727 })
728 } else {
729 let data_source = DataSourceDesc::Webhook {
730 validate_using,
731 body_format,
732 headers,
733 cluster_id: None,
735 };
736 Plan::CreateSource(CreateSourcePlan {
737 name,
738 source: Source {
739 create_sql,
740 data_source,
741 desc,
742 compaction_window: None,
743 },
744 if_not_exists,
745 timeline,
746 in_cluster: Some(in_cluster.id()),
747 })
748 };
749
750 Ok(plan)
751}
752
753pub fn plan_create_source(
754 scx: &StatementContext,
755 mut stmt: CreateSourceStatement<Aug>,
756) -> Result<Plan, PlanError> {
757 let CreateSourceStatement {
758 name,
759 in_cluster: _,
760 col_names,
761 connection: source_connection,
762 envelope,
763 if_not_exists,
764 format,
765 key_constraint,
766 include_metadata,
767 with_options,
768 external_references: referenced_subsources,
769 progress_subsource,
770 } = &stmt;
771
772 mz_ore::soft_assert_or_log!(
773 referenced_subsources.is_none(),
774 "referenced subsources must be cleared in purification"
775 );
776
777 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
778 && scx.catalog.system_vars().force_source_table_syntax();
779
780 if force_source_table_syntax {
783 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
784 Err(PlanError::UseTablesForSources(
785 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
786 ))?;
787 }
788 }
789
790 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
791
792 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
793 && include_metadata
794 .iter()
795 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
796 {
797 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
799 }
800 if !matches!(
801 source_connection,
802 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
803 ) && !include_metadata.is_empty()
804 {
805 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
806 }
807
808 if !include_metadata.is_empty()
809 && !matches!(
810 envelope,
811 ast::SourceEnvelope::Upsert { .. }
812 | ast::SourceEnvelope::None
813 | ast::SourceEnvelope::Debezium
814 )
815 {
816 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
817 }
818
819 let external_connection =
820 plan_generic_source_connection(scx, source_connection, include_metadata)?;
821
822 let CreateSourceOptionExtracted {
823 timestamp_interval,
824 retain_history,
825 seen: _,
826 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
827
828 let metadata_columns_desc = match external_connection {
829 GenericSourceConnection::Kafka(KafkaSourceConnection {
830 ref metadata_columns,
831 ..
832 }) => kafka_metadata_columns_desc(metadata_columns),
833 _ => vec![],
834 };
835
836 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
838 scx,
839 &envelope,
840 format,
841 Some(external_connection.default_key_desc()),
842 external_connection.default_value_desc(),
843 include_metadata,
844 metadata_columns_desc,
845 &external_connection,
846 )?;
847 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
848
849 let names: Vec<_> = desc.iter_names().cloned().collect();
850 if let Some(dup) = names.iter().duplicates().next() {
851 sql_bail!("column {} specified more than once", dup.quoted());
852 }
853
854 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
856 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
859
860 let key_columns = columns
861 .into_iter()
862 .map(normalize::column_name)
863 .collect::<Vec<_>>();
864
865 let mut uniq = BTreeSet::new();
866 for col in key_columns.iter() {
867 if !uniq.insert(col) {
868 sql_bail!("Repeated column name in source key constraint: {}", col);
869 }
870 }
871
872 let key_indices = key_columns
873 .iter()
874 .map(|col| {
875 let name_idx = desc
876 .get_by_name(col)
877 .map(|(idx, _type)| idx)
878 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
879 if desc.get_unambiguous_name(name_idx).is_none() {
880 sql_bail!("Ambiguous column in source key constraint: {}", col);
881 }
882 Ok(name_idx)
883 })
884 .collect::<Result<Vec<_>, _>>()?;
885
886 if !desc.typ().keys.is_empty() {
887 return Err(key_constraint_err(&desc, &key_columns));
888 } else {
889 desc = desc.with_key(key_indices);
890 }
891 }
892
893 let timestamp_interval = match timestamp_interval {
894 Some(duration) => {
895 let min = scx.catalog.system_vars().min_timestamp_interval();
896 let max = scx.catalog.system_vars().max_timestamp_interval();
897 if duration < min || duration > max {
898 return Err(PlanError::InvalidTimestampInterval {
899 min,
900 max,
901 requested: duration,
902 });
903 }
904 duration
905 }
906 None => scx.catalog.config().timestamp_interval,
907 };
908
909 let (desc, data_source) = match progress_subsource {
910 Some(name) => {
911 let DeferredItemName::Named(name) = name else {
912 sql_bail!("[internal error] progress subsource must be named during purification");
913 };
914 let ResolvedItemName::Item { id, .. } = name else {
915 sql_bail!("[internal error] invalid target id");
916 };
917
918 let details = match external_connection {
919 GenericSourceConnection::Kafka(ref c) => {
920 SourceExportDetails::Kafka(KafkaSourceExportDetails {
921 metadata_columns: c.metadata_columns.clone(),
922 })
923 }
924 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
925 LoadGenerator::Auction
926 | LoadGenerator::Marketing
927 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
928 LoadGenerator::Counter { .. }
929 | LoadGenerator::Clock
930 | LoadGenerator::Datums
931 | LoadGenerator::KeyValue(_) => {
932 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
933 output: LoadGeneratorOutput::Default,
934 })
935 }
936 },
937 GenericSourceConnection::Postgres(_)
938 | GenericSourceConnection::MySql(_)
939 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
940 };
941
942 let data_source = DataSourceDesc::OldSyntaxIngestion {
943 desc: SourceDesc {
944 connection: external_connection,
945 timestamp_interval,
946 },
947 progress_subsource: *id,
948 data_config: SourceExportDataConfig {
949 encoding,
950 envelope: envelope.clone(),
951 },
952 details,
953 };
954 (desc, data_source)
955 }
956 None => {
957 let desc = external_connection.timestamp_desc();
958 let data_source = DataSourceDesc::Ingestion(SourceDesc {
959 connection: external_connection,
960 timestamp_interval,
961 });
962 (desc, data_source)
963 }
964 };
965
966 let if_not_exists = *if_not_exists;
967 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
968
969 let full_name = scx.catalog.resolve_full_name(&name);
971 let partial_name = PartialItemName::from(full_name.clone());
972 if let (false, Ok(item)) = (
975 if_not_exists,
976 scx.catalog.resolve_item_or_type(&partial_name),
977 ) {
978 return Err(PlanError::ItemAlreadyExists {
979 name: full_name.to_string(),
980 item_type: item.item_type(),
981 });
982 }
983
984 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
988
989 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
990
991 let timeline = match envelope {
993 SourceEnvelope::CdcV2 => {
994 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
995 }
996 _ => Timeline::EpochMilliseconds,
997 };
998
999 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1000
1001 let source = Source {
1002 create_sql,
1003 data_source,
1004 desc,
1005 compaction_window,
1006 };
1007
1008 Ok(Plan::CreateSource(CreateSourcePlan {
1009 name,
1010 source,
1011 if_not_exists,
1012 timeline,
1013 in_cluster: Some(in_cluster.id()),
1014 }))
1015}
1016
1017pub fn plan_generic_source_connection(
1018 scx: &StatementContext<'_>,
1019 source_connection: &CreateSourceConnection<Aug>,
1020 include_metadata: &Vec<SourceIncludeMetadata>,
1021) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1022 Ok(match source_connection {
1023 CreateSourceConnection::Kafka {
1024 connection,
1025 options,
1026 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1027 scx,
1028 connection,
1029 options,
1030 include_metadata,
1031 )?),
1032 CreateSourceConnection::Postgres {
1033 connection,
1034 options,
1035 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1036 scx, connection, options,
1037 )?),
1038 CreateSourceConnection::SqlServer {
1039 connection,
1040 options,
1041 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1042 scx, connection, options,
1043 )?),
1044 CreateSourceConnection::MySql {
1045 connection,
1046 options,
1047 } => {
1048 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1049 }
1050 CreateSourceConnection::LoadGenerator { generator, options } => {
1051 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1052 scx,
1053 generator,
1054 options,
1055 include_metadata,
1056 )?)
1057 }
1058 })
1059}
1060
1061fn plan_load_generator_source_connection(
1062 scx: &StatementContext<'_>,
1063 generator: &ast::LoadGenerator,
1064 options: &Vec<LoadGeneratorOption<Aug>>,
1065 include_metadata: &Vec<SourceIncludeMetadata>,
1066) -> Result<LoadGeneratorSourceConnection, PlanError> {
1067 let load_generator =
1068 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1069 let LoadGeneratorOptionExtracted {
1070 tick_interval,
1071 as_of,
1072 up_to,
1073 ..
1074 } = options.clone().try_into()?;
1075 let tick_micros = match tick_interval {
1076 Some(interval) => Some(interval.as_micros().try_into()?),
1077 None => None,
1078 };
1079 if up_to < as_of {
1080 sql_bail!("UP TO cannot be less than AS OF");
1081 }
1082 Ok(LoadGeneratorSourceConnection {
1083 load_generator,
1084 tick_micros,
1085 as_of,
1086 up_to,
1087 })
1088}
1089
1090fn plan_mysql_source_connection(
1091 scx: &StatementContext<'_>,
1092 connection: &ResolvedItemName,
1093 options: &Vec<MySqlConfigOption<Aug>>,
1094) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1095 let connection_item = scx.get_item_by_resolved_name(connection)?;
1096 match connection_item.connection()? {
1097 Connection::MySql(connection) => connection,
1098 _ => sql_bail!(
1099 "{} is not a MySQL connection",
1100 scx.catalog.resolve_full_name(connection_item.name())
1101 ),
1102 };
1103 let MySqlConfigOptionExtracted {
1104 details,
1105 text_columns: _,
1109 exclude_columns: _,
1110 seen: _,
1111 } = options.clone().try_into()?;
1112 let details = details
1113 .as_ref()
1114 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1115 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1116 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1117 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1118 Ok(MySqlSourceConnection {
1119 connection: connection_item.id(),
1120 connection_id: connection_item.id(),
1121 details,
1122 })
1123}
1124
1125fn plan_sqlserver_source_connection(
1126 scx: &StatementContext<'_>,
1127 connection: &ResolvedItemName,
1128 options: &Vec<SqlServerConfigOption<Aug>>,
1129) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1130 let connection_item = scx.get_item_by_resolved_name(connection)?;
1131 match connection_item.connection()? {
1132 Connection::SqlServer(connection) => connection,
1133 _ => sql_bail!(
1134 "{} is not a SQL Server connection",
1135 scx.catalog.resolve_full_name(connection_item.name())
1136 ),
1137 };
1138 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1139 let details = details
1140 .as_ref()
1141 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1142 let extras = hex::decode(details)
1143 .map_err(|e| sql_err!("{e}"))
1144 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1145 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1146 Ok(SqlServerSourceConnection {
1147 connection_id: connection_item.id(),
1148 connection: connection_item.id(),
1149 extras,
1150 })
1151}
1152
1153fn plan_postgres_source_connection(
1154 scx: &StatementContext<'_>,
1155 connection: &ResolvedItemName,
1156 options: &Vec<PgConfigOption<Aug>>,
1157) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1158 let connection_item = scx.get_item_by_resolved_name(connection)?;
1159 let PgConfigOptionExtracted {
1160 details,
1161 publication,
1162 text_columns: _,
1166 exclude_columns: _,
1170 seen: _,
1171 } = options.clone().try_into()?;
1172 let details = details
1173 .as_ref()
1174 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1175 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1176 let details =
1177 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1178 let publication_details =
1179 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1180 Ok(PostgresSourceConnection {
1181 connection: connection_item.id(),
1182 connection_id: connection_item.id(),
1183 publication: publication.expect("validated exists during purification"),
1184 publication_details,
1185 })
1186}
1187
1188fn plan_kafka_source_connection(
1189 scx: &StatementContext<'_>,
1190 connection_name: &ResolvedItemName,
1191 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1192 include_metadata: &Vec<SourceIncludeMetadata>,
1193) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1194 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1195 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1196 sql_bail!(
1197 "{} is not a kafka connection",
1198 scx.catalog.resolve_full_name(connection_item.name())
1199 )
1200 }
1201 let KafkaSourceConfigOptionExtracted {
1202 group_id_prefix,
1203 topic,
1204 topic_metadata_refresh_interval,
1205 start_timestamp: _, start_offset,
1207 seen: _,
1208 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1209 let topic = topic.expect("validated exists during purification");
1210 let mut start_offsets = BTreeMap::new();
1211 if let Some(offsets) = start_offset {
1212 for (part, offset) in offsets.iter().enumerate() {
1213 if *offset < 0 {
1214 sql_bail!("START OFFSET must be a nonnegative integer");
1215 }
1216 start_offsets.insert(i32::try_from(part)?, *offset);
1217 }
1218 }
1219 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1220 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1223 }
1224 let metadata_columns = include_metadata
1225 .into_iter()
1226 .flat_map(|item| match item {
1227 SourceIncludeMetadata::Timestamp { alias } => {
1228 let name = match alias {
1229 Some(name) => name.to_string(),
1230 None => "timestamp".to_owned(),
1231 };
1232 Some((name, KafkaMetadataKind::Timestamp))
1233 }
1234 SourceIncludeMetadata::Partition { alias } => {
1235 let name = match alias {
1236 Some(name) => name.to_string(),
1237 None => "partition".to_owned(),
1238 };
1239 Some((name, KafkaMetadataKind::Partition))
1240 }
1241 SourceIncludeMetadata::Offset { alias } => {
1242 let name = match alias {
1243 Some(name) => name.to_string(),
1244 None => "offset".to_owned(),
1245 };
1246 Some((name, KafkaMetadataKind::Offset))
1247 }
1248 SourceIncludeMetadata::Headers { alias } => {
1249 let name = match alias {
1250 Some(name) => name.to_string(),
1251 None => "headers".to_owned(),
1252 };
1253 Some((name, KafkaMetadataKind::Headers))
1254 }
1255 SourceIncludeMetadata::Header {
1256 alias,
1257 key,
1258 use_bytes,
1259 } => Some((
1260 alias.to_string(),
1261 KafkaMetadataKind::Header {
1262 key: key.clone(),
1263 use_bytes: *use_bytes,
1264 },
1265 )),
1266 SourceIncludeMetadata::Key { .. } => {
1267 None
1269 }
1270 })
1271 .collect();
1272 Ok(KafkaSourceConnection {
1273 connection: connection_item.id(),
1274 connection_id: connection_item.id(),
1275 topic,
1276 start_offsets,
1277 group_id_prefix,
1278 topic_metadata_refresh_interval,
1279 metadata_columns,
1280 })
1281}
1282
1283fn apply_source_envelope_encoding(
1284 scx: &StatementContext,
1285 envelope: &ast::SourceEnvelope,
1286 format: &Option<FormatSpecifier<Aug>>,
1287 key_desc: Option<RelationDesc>,
1288 value_desc: RelationDesc,
1289 include_metadata: &[SourceIncludeMetadata],
1290 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1291 source_connection: &GenericSourceConnection<ReferencedConnection>,
1292) -> Result<
1293 (
1294 RelationDesc,
1295 SourceEnvelope,
1296 Option<SourceDataEncoding<ReferencedConnection>>,
1297 ),
1298 PlanError,
1299> {
1300 let encoding = match format {
1301 Some(format) => Some(get_encoding(scx, format, envelope)?),
1302 None => None,
1303 };
1304
1305 let (key_desc, value_desc) = match &encoding {
1306 Some(encoding) => {
1307 match value_desc.typ().columns() {
1310 [typ] => match typ.scalar_type {
1311 SqlScalarType::Bytes => {}
1312 _ => sql_bail!(
1313 "The schema produced by the source is incompatible with format decoding"
1314 ),
1315 },
1316 _ => sql_bail!(
1317 "The schema produced by the source is incompatible with format decoding"
1318 ),
1319 }
1320
1321 let (key_desc, value_desc) = encoding.desc()?;
1322
1323 let key_desc = key_desc.map(|desc| {
1330 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1331 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1332 if is_kafka && is_envelope_none {
1333 RelationDesc::from_names_and_types(
1334 desc.into_iter()
1335 .map(|(name, typ)| (name, typ.nullable(true))),
1336 )
1337 } else {
1338 desc
1339 }
1340 });
1341 (key_desc, value_desc)
1342 }
1343 None => (key_desc, value_desc),
1344 };
1345
1346 let key_envelope_no_encoding = matches!(
1359 source_connection,
1360 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1361 load_generator: LoadGenerator::KeyValue(_),
1362 ..
1363 })
1364 );
1365 let mut key_envelope = get_key_envelope(
1366 include_metadata,
1367 encoding.as_ref(),
1368 key_envelope_no_encoding,
1369 )?;
1370
1371 match (&envelope, &key_envelope) {
1372 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1373 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1374 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1375 ),
1376 _ => {}
1377 };
1378
1379 let envelope = match &envelope {
1388 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1390 ast::SourceEnvelope::Debezium => {
1391 let after_idx = match typecheck_debezium(&value_desc) {
1393 Ok((_before_idx, after_idx)) => Ok(after_idx),
1394 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1395 Some(DataEncoding::Avro(_)) => Err(type_err),
1396 _ => Err(sql_err!(
1397 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1398 )),
1399 },
1400 }?;
1401
1402 UnplannedSourceEnvelope::Upsert {
1403 style: UpsertStyle::Debezium { after_idx },
1404 }
1405 }
1406 ast::SourceEnvelope::Upsert {
1407 value_decode_err_policy,
1408 } => {
1409 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1410 None => {
1411 if !key_envelope_no_encoding {
1412 bail_unsupported!(format!(
1413 "UPSERT requires a key/value format: {:?}",
1414 format
1415 ))
1416 }
1417 None
1418 }
1419 Some(key_encoding) => Some(key_encoding),
1420 };
1421 if key_envelope == KeyEnvelope::None {
1424 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1425 }
1426 let style = match value_decode_err_policy.as_slice() {
1428 [] => UpsertStyle::Default(key_envelope),
1429 [SourceErrorPolicy::Inline { alias }] => {
1430 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1431 UpsertStyle::ValueErrInline {
1432 key_envelope,
1433 error_column: alias
1434 .as_ref()
1435 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1436 }
1437 }
1438 _ => {
1439 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1440 }
1441 };
1442
1443 UnplannedSourceEnvelope::Upsert { style }
1444 }
1445 ast::SourceEnvelope::CdcV2 => {
1446 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1447 match format {
1449 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1450 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1451 }
1452 UnplannedSourceEnvelope::CdcV2
1453 }
1454 };
1455
1456 let metadata_desc = included_column_desc(metadata_columns_desc);
1457 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1458
1459 Ok((desc, envelope, encoding))
1460}
1461
1462fn plan_source_export_desc(
1465 scx: &StatementContext,
1466 name: &UnresolvedItemName,
1467 columns: &Vec<ColumnDef<Aug>>,
1468 constraints: &Vec<TableConstraint<Aug>>,
1469) -> Result<RelationDesc, PlanError> {
1470 let names: Vec<_> = columns
1471 .iter()
1472 .map(|c| normalize::column_name(c.name.clone()))
1473 .collect();
1474
1475 if let Some(dup) = names.iter().duplicates().next() {
1476 sql_bail!("column {} specified more than once", dup.quoted());
1477 }
1478
1479 let mut column_types = Vec::with_capacity(columns.len());
1482 let mut keys = Vec::new();
1483
1484 for (i, c) in columns.into_iter().enumerate() {
1485 let aug_data_type = &c.data_type;
1486 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1487 let mut nullable = true;
1488 for option in &c.options {
1489 match &option.option {
1490 ColumnOption::NotNull => nullable = false,
1491 ColumnOption::Default(_) => {
1492 bail_unsupported!("Source export with default value")
1493 }
1494 ColumnOption::Unique { is_primary } => {
1495 keys.push(vec![i]);
1496 if *is_primary {
1497 nullable = false;
1498 }
1499 }
1500 other => {
1501 bail_unsupported!(format!("Source export with column constraint: {}", other))
1502 }
1503 }
1504 }
1505 column_types.push(ty.nullable(nullable));
1506 }
1507
1508 let mut seen_primary = false;
1509 'c: for constraint in constraints {
1510 match constraint {
1511 TableConstraint::Unique {
1512 name: _,
1513 columns,
1514 is_primary,
1515 nulls_not_distinct,
1516 } => {
1517 if seen_primary && *is_primary {
1518 sql_bail!(
1519 "multiple primary keys for source export {} are not allowed",
1520 name.to_ast_string_stable()
1521 );
1522 }
1523 seen_primary = *is_primary || seen_primary;
1524
1525 let mut key = vec![];
1526 for column in columns {
1527 let column = normalize::column_name(column.clone());
1528 match names.iter().position(|name| *name == column) {
1529 None => sql_bail!("unknown column in constraint: {}", column),
1530 Some(i) => {
1531 let nullable = &mut column_types[i].nullable;
1532 if *is_primary {
1533 if *nulls_not_distinct {
1534 sql_bail!(
1535 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1536 );
1537 }
1538 *nullable = false;
1539 } else if !(*nulls_not_distinct || !*nullable) {
1540 break 'c;
1543 }
1544
1545 key.push(i);
1546 }
1547 }
1548 }
1549
1550 if *is_primary {
1551 keys.insert(0, key);
1552 } else {
1553 keys.push(key);
1554 }
1555 }
1556 TableConstraint::ForeignKey { .. } => {
1557 bail_unsupported!("Source export with a foreign key")
1558 }
1559 TableConstraint::Check { .. } => {
1560 bail_unsupported!("Source export with a check constraint")
1561 }
1562 }
1563 }
1564
1565 let typ = SqlRelationType::new(column_types).with_keys(keys);
1566 let desc = RelationDesc::new(typ, names);
1567 Ok(desc)
1568}
1569
1570generate_extracted_config!(
1571 CreateSubsourceOption,
1572 (Progress, bool, Default(false)),
1573 (ExternalReference, UnresolvedItemName),
1574 (RetainHistory, OptionalDuration),
1575 (TextColumns, Vec::<Ident>, Default(vec![])),
1576 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1577 (Details, String)
1578);
1579
1580pub fn plan_create_subsource(
1581 scx: &StatementContext,
1582 stmt: CreateSubsourceStatement<Aug>,
1583) -> Result<Plan, PlanError> {
1584 let CreateSubsourceStatement {
1585 name,
1586 columns,
1587 of_source,
1588 constraints,
1589 if_not_exists,
1590 with_options,
1591 } = &stmt;
1592
1593 let CreateSubsourceOptionExtracted {
1594 progress,
1595 retain_history,
1596 external_reference,
1597 text_columns,
1598 exclude_columns,
1599 details,
1600 seen: _,
1601 } = with_options.clone().try_into()?;
1602
1603 assert!(
1608 progress ^ (external_reference.is_some() && of_source.is_some()),
1609 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1610 );
1611
1612 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1613
1614 let data_source = if let Some(source_reference) = of_source {
1615 if scx.catalog.system_vars().enable_create_table_from_source()
1618 && scx.catalog.system_vars().force_source_table_syntax()
1619 {
1620 Err(PlanError::UseTablesForSources(
1621 "CREATE SUBSOURCE".to_string(),
1622 ))?;
1623 }
1624
1625 let ingestion_id = *source_reference.item_id();
1628 let external_reference = external_reference.unwrap();
1629
1630 let details = details
1633 .as_ref()
1634 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1635 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1636 let details =
1637 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1638 let details =
1639 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1640 let details = match details {
1641 SourceExportStatementDetails::Postgres { table } => {
1642 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1643 column_casts: crate::pure::postgres::generate_column_casts(
1644 scx,
1645 &table,
1646 &text_columns,
1647 )?,
1648 table,
1649 })
1650 }
1651 SourceExportStatementDetails::MySql {
1652 table,
1653 initial_gtid_set,
1654 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1655 table,
1656 initial_gtid_set,
1657 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1658 exclude_columns: exclude_columns
1659 .into_iter()
1660 .map(|c| c.into_string())
1661 .collect(),
1662 }),
1663 SourceExportStatementDetails::SqlServer {
1664 table,
1665 capture_instance,
1666 initial_lsn,
1667 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1668 capture_instance,
1669 table,
1670 initial_lsn,
1671 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1672 exclude_columns: exclude_columns
1673 .into_iter()
1674 .map(|c| c.into_string())
1675 .collect(),
1676 }),
1677 SourceExportStatementDetails::LoadGenerator { output } => {
1678 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1679 }
1680 SourceExportStatementDetails::Kafka {} => {
1681 bail_unsupported!("subsources cannot reference Kafka sources")
1682 }
1683 };
1684 DataSourceDesc::IngestionExport {
1685 ingestion_id,
1686 external_reference,
1687 details,
1688 data_config: SourceExportDataConfig {
1690 envelope: SourceEnvelope::None(NoneEnvelope {
1691 key_envelope: KeyEnvelope::None,
1692 key_arity: 0,
1693 }),
1694 encoding: None,
1695 },
1696 }
1697 } else if progress {
1698 DataSourceDesc::Progress
1699 } else {
1700 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1701 };
1702
1703 let if_not_exists = *if_not_exists;
1704 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1705
1706 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1707
1708 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1709 let source = Source {
1710 create_sql,
1711 data_source,
1712 desc,
1713 compaction_window,
1714 };
1715
1716 Ok(Plan::CreateSource(CreateSourcePlan {
1717 name,
1718 source,
1719 if_not_exists,
1720 timeline: Timeline::EpochMilliseconds,
1721 in_cluster: None,
1722 }))
1723}
1724
1725generate_extracted_config!(
1726 TableFromSourceOption,
1727 (TextColumns, Vec::<Ident>, Default(vec![])),
1728 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1729 (PartitionBy, Vec<Ident>),
1730 (RetainHistory, OptionalDuration),
1731 (Details, String)
1732);
1733
1734pub fn plan_create_table_from_source(
1735 scx: &StatementContext,
1736 stmt: CreateTableFromSourceStatement<Aug>,
1737) -> Result<Plan, PlanError> {
1738 if !scx.catalog.system_vars().enable_create_table_from_source() {
1739 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1740 }
1741
1742 let CreateTableFromSourceStatement {
1743 name,
1744 columns,
1745 constraints,
1746 if_not_exists,
1747 source,
1748 external_reference,
1749 envelope,
1750 format,
1751 include_metadata,
1752 with_options,
1753 } = &stmt;
1754
1755 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1756
1757 let TableFromSourceOptionExtracted {
1758 text_columns,
1759 exclude_columns,
1760 retain_history,
1761 partition_by,
1762 details,
1763 seen: _,
1764 } = with_options.clone().try_into()?;
1765
1766 let source_item = scx.get_item_by_resolved_name(source)?;
1767 let ingestion_id = source_item.id();
1768
1769 let details = details
1772 .as_ref()
1773 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1774 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1775 let details =
1776 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1777 let details =
1778 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1779
1780 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1781 && include_metadata
1782 .iter()
1783 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1784 {
1785 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1787 }
1788 if !matches!(
1789 details,
1790 SourceExportStatementDetails::Kafka { .. }
1791 | SourceExportStatementDetails::LoadGenerator { .. }
1792 ) && !include_metadata.is_empty()
1793 {
1794 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1795 }
1796
1797 let details = match details {
1798 SourceExportStatementDetails::Postgres { table } => {
1799 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1800 column_casts: crate::pure::postgres::generate_column_casts(
1801 scx,
1802 &table,
1803 &text_columns,
1804 )?,
1805 table,
1806 })
1807 }
1808 SourceExportStatementDetails::MySql {
1809 table,
1810 initial_gtid_set,
1811 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1812 table,
1813 initial_gtid_set,
1814 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1815 exclude_columns: exclude_columns
1816 .into_iter()
1817 .map(|c| c.into_string())
1818 .collect(),
1819 }),
1820 SourceExportStatementDetails::SqlServer {
1821 table,
1822 capture_instance,
1823 initial_lsn,
1824 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1825 table,
1826 capture_instance,
1827 initial_lsn,
1828 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1829 exclude_columns: exclude_columns
1830 .into_iter()
1831 .map(|c| c.into_string())
1832 .collect(),
1833 }),
1834 SourceExportStatementDetails::LoadGenerator { output } => {
1835 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1836 }
1837 SourceExportStatementDetails::Kafka {} => {
1838 if !include_metadata.is_empty()
1839 && !matches!(
1840 envelope,
1841 ast::SourceEnvelope::Upsert { .. }
1842 | ast::SourceEnvelope::None
1843 | ast::SourceEnvelope::Debezium
1844 )
1845 {
1846 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1848 }
1849
1850 let metadata_columns = include_metadata
1851 .into_iter()
1852 .flat_map(|item| match item {
1853 SourceIncludeMetadata::Timestamp { alias } => {
1854 let name = match alias {
1855 Some(name) => name.to_string(),
1856 None => "timestamp".to_owned(),
1857 };
1858 Some((name, KafkaMetadataKind::Timestamp))
1859 }
1860 SourceIncludeMetadata::Partition { alias } => {
1861 let name = match alias {
1862 Some(name) => name.to_string(),
1863 None => "partition".to_owned(),
1864 };
1865 Some((name, KafkaMetadataKind::Partition))
1866 }
1867 SourceIncludeMetadata::Offset { alias } => {
1868 let name = match alias {
1869 Some(name) => name.to_string(),
1870 None => "offset".to_owned(),
1871 };
1872 Some((name, KafkaMetadataKind::Offset))
1873 }
1874 SourceIncludeMetadata::Headers { alias } => {
1875 let name = match alias {
1876 Some(name) => name.to_string(),
1877 None => "headers".to_owned(),
1878 };
1879 Some((name, KafkaMetadataKind::Headers))
1880 }
1881 SourceIncludeMetadata::Header {
1882 alias,
1883 key,
1884 use_bytes,
1885 } => Some((
1886 alias.to_string(),
1887 KafkaMetadataKind::Header {
1888 key: key.clone(),
1889 use_bytes: *use_bytes,
1890 },
1891 )),
1892 SourceIncludeMetadata::Key { .. } => {
1893 None
1895 }
1896 })
1897 .collect();
1898
1899 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1900 }
1901 };
1902
1903 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1904
1905 let (key_desc, value_desc) =
1910 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1911 let columns = match columns {
1912 TableFromSourceColumns::Defined(columns) => columns,
1913 _ => unreachable!(),
1914 };
1915 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1916 (None, desc)
1917 } else {
1918 let key_desc = source_connection.default_key_desc();
1919 let value_desc = source_connection.default_value_desc();
1920 (Some(key_desc), value_desc)
1921 };
1922
1923 let metadata_columns_desc = match &details {
1924 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1925 metadata_columns, ..
1926 }) => kafka_metadata_columns_desc(metadata_columns),
1927 _ => vec![],
1928 };
1929
1930 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1931 scx,
1932 &envelope,
1933 format,
1934 key_desc,
1935 value_desc,
1936 include_metadata,
1937 metadata_columns_desc,
1938 source_connection,
1939 )?;
1940 if let TableFromSourceColumns::Named(col_names) = columns {
1941 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1942 }
1943
1944 let names: Vec<_> = desc.iter_names().cloned().collect();
1945 if let Some(dup) = names.iter().duplicates().next() {
1946 sql_bail!("column {} specified more than once", dup.quoted());
1947 }
1948
1949 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1950
1951 let timeline = match envelope {
1954 SourceEnvelope::CdcV2 => {
1955 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1956 }
1957 _ => Timeline::EpochMilliseconds,
1958 };
1959
1960 if let Some(partition_by) = partition_by {
1961 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1962 check_partition_by(&desc, partition_by)?;
1963 }
1964
1965 let data_source = DataSourceDesc::IngestionExport {
1966 ingestion_id,
1967 external_reference: external_reference
1968 .as_ref()
1969 .expect("populated in purification")
1970 .clone(),
1971 details,
1972 data_config: SourceExportDataConfig { envelope, encoding },
1973 };
1974
1975 let if_not_exists = *if_not_exists;
1976
1977 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1978
1979 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1980 let table = Table {
1981 create_sql,
1982 desc: VersionedRelationDesc::new(desc),
1983 temporary: false,
1984 compaction_window,
1985 data_source: TableDataSource::DataSource {
1986 desc: data_source,
1987 timeline,
1988 },
1989 };
1990
1991 Ok(Plan::CreateTable(CreateTablePlan {
1992 name,
1993 table,
1994 if_not_exists,
1995 }))
1996}
1997
1998generate_extracted_config!(
1999 LoadGeneratorOption,
2000 (TickInterval, Duration),
2001 (AsOf, u64, Default(0_u64)),
2002 (UpTo, u64, Default(u64::MAX)),
2003 (ScaleFactor, f64),
2004 (MaxCardinality, u64),
2005 (Keys, u64),
2006 (SnapshotRounds, u64),
2007 (TransactionalSnapshot, bool),
2008 (ValueSize, u64),
2009 (Seed, u64),
2010 (Partitions, u64),
2011 (BatchSize, u64)
2012);
2013
2014impl LoadGeneratorOptionExtracted {
2015 pub(super) fn ensure_only_valid_options(
2016 &self,
2017 loadgen: &ast::LoadGenerator,
2018 ) -> Result<(), PlanError> {
2019 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2020
2021 let mut options = self.seen.clone();
2022
2023 let permitted_options: &[_] = match loadgen {
2024 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2025 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2026 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2027 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2028 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2029 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2030 ast::LoadGenerator::KeyValue => &[
2031 TickInterval,
2032 Keys,
2033 SnapshotRounds,
2034 TransactionalSnapshot,
2035 ValueSize,
2036 Seed,
2037 Partitions,
2038 BatchSize,
2039 ],
2040 };
2041
2042 for o in permitted_options {
2043 options.remove(o);
2044 }
2045
2046 if !options.is_empty() {
2047 sql_bail!(
2048 "{} load generators do not support {} values",
2049 loadgen,
2050 options.iter().join(", ")
2051 )
2052 }
2053
2054 Ok(())
2055 }
2056}
2057
2058pub(crate) fn load_generator_ast_to_generator(
2059 scx: &StatementContext,
2060 loadgen: &ast::LoadGenerator,
2061 options: &[LoadGeneratorOption<Aug>],
2062 include_metadata: &[SourceIncludeMetadata],
2063) -> Result<LoadGenerator, PlanError> {
2064 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2065 extracted.ensure_only_valid_options(loadgen)?;
2066
2067 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2068 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2069 }
2070
2071 let load_generator = match loadgen {
2072 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2073 ast::LoadGenerator::Clock => {
2074 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2075 LoadGenerator::Clock
2076 }
2077 ast::LoadGenerator::Counter => {
2078 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2079 let LoadGeneratorOptionExtracted {
2080 max_cardinality, ..
2081 } = extracted;
2082 LoadGenerator::Counter { max_cardinality }
2083 }
2084 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2085 ast::LoadGenerator::Datums => {
2086 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2087 LoadGenerator::Datums
2088 }
2089 ast::LoadGenerator::Tpch => {
2090 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2091
2092 let sf: f64 = scale_factor.unwrap_or(0.01);
2094 if !sf.is_finite() || sf < 0.0 {
2095 sql_bail!("unsupported scale factor {sf}");
2096 }
2097
2098 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2099 let total = (sf * multiplier).floor();
2100 let mut i = i64::try_cast_from(total)
2101 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2102 if i < 1 {
2103 i = 1;
2104 }
2105 Ok(i)
2106 };
2107
2108 let count_supplier = f_to_i(10_000f64)?;
2111 let count_part = f_to_i(200_000f64)?;
2112 let count_customer = f_to_i(150_000f64)?;
2113 let count_orders = f_to_i(150_000f64 * 10f64)?;
2114 let count_clerk = f_to_i(1_000f64)?;
2115
2116 LoadGenerator::Tpch {
2117 count_supplier,
2118 count_part,
2119 count_customer,
2120 count_orders,
2121 count_clerk,
2122 }
2123 }
2124 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2125 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2126 let LoadGeneratorOptionExtracted {
2127 keys,
2128 snapshot_rounds,
2129 transactional_snapshot,
2130 value_size,
2131 tick_interval,
2132 seed,
2133 partitions,
2134 batch_size,
2135 ..
2136 } = extracted;
2137
2138 let mut include_offset = None;
2139 for im in include_metadata {
2140 match im {
2141 SourceIncludeMetadata::Offset { alias } => {
2142 include_offset = match alias {
2143 Some(alias) => Some(alias.to_string()),
2144 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2145 }
2146 }
2147 SourceIncludeMetadata::Key { .. } => continue,
2148
2149 _ => {
2150 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2151 }
2152 };
2153 }
2154
2155 let lgkv = KeyValueLoadGenerator {
2156 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2157 snapshot_rounds: snapshot_rounds
2158 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2159 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2161 value_size: value_size
2162 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2163 partitions: partitions
2164 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2165 tick_interval,
2166 batch_size: batch_size
2167 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2168 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2169 include_offset,
2170 };
2171
2172 if lgkv.keys == 0
2173 || lgkv.partitions == 0
2174 || lgkv.value_size == 0
2175 || lgkv.batch_size == 0
2176 {
2177 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2178 }
2179
2180 if lgkv.keys % lgkv.partitions != 0 {
2181 sql_bail!("KEYS must be a multiple of PARTITIONS")
2182 }
2183
2184 if lgkv.batch_size > lgkv.keys {
2185 sql_bail!("KEYS must be larger than BATCH SIZE")
2186 }
2187
2188 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2191 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2192 }
2193
2194 if lgkv.snapshot_rounds == 0 {
2195 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2196 }
2197
2198 LoadGenerator::KeyValue(lgkv)
2199 }
2200 };
2201
2202 Ok(load_generator)
2203}
2204
2205fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2206 let before = value_desc.get_by_name(&"before".into());
2207 let (after_idx, after_ty) = value_desc
2208 .get_by_name(&"after".into())
2209 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2210 let before_idx = if let Some((before_idx, before_ty)) = before {
2211 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2212 sql_bail!("'before' column must be of type record");
2213 }
2214 if before_ty != after_ty {
2215 sql_bail!("'before' type differs from 'after' column");
2216 }
2217 Some(before_idx)
2218 } else {
2219 None
2220 };
2221 Ok((before_idx, after_idx))
2222}
2223
2224fn get_encoding(
2225 scx: &StatementContext,
2226 format: &FormatSpecifier<Aug>,
2227 envelope: &ast::SourceEnvelope,
2228) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2229 let encoding = match format {
2230 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2231 FormatSpecifier::KeyValue { key, value } => {
2232 let key = {
2233 let encoding = get_encoding_inner(scx, key)?;
2234 Some(encoding.key.unwrap_or(encoding.value))
2235 };
2236 let value = get_encoding_inner(scx, value)?.value;
2237 SourceDataEncoding { key, value }
2238 }
2239 };
2240
2241 let requires_keyvalue = matches!(
2242 envelope,
2243 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2244 );
2245 let is_keyvalue = encoding.key.is_some();
2246 if requires_keyvalue && !is_keyvalue {
2247 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2248 };
2249
2250 Ok(encoding)
2251}
2252
2253fn source_sink_cluster_config<'a, 'ctx>(
2259 scx: &'a StatementContext<'ctx>,
2260 in_cluster: &mut Option<ResolvedClusterName>,
2261) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2262 let cluster = match in_cluster {
2263 None => {
2264 let cluster = scx.catalog.resolve_cluster(None)?;
2265 *in_cluster = Some(ResolvedClusterName {
2266 id: cluster.id(),
2267 print_name: None,
2268 });
2269 cluster
2270 }
2271 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2272 };
2273
2274 Ok(cluster)
2275}
2276
2277generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2278
2279#[derive(Debug)]
2280pub struct Schema {
2281 pub key_schema: Option<String>,
2282 pub value_schema: String,
2283 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2284 pub confluent_wire_format: bool,
2285}
2286
2287fn get_encoding_inner(
2288 scx: &StatementContext,
2289 format: &Format<Aug>,
2290) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2291 let value = match format {
2292 Format::Bytes => DataEncoding::Bytes,
2293 Format::Avro(schema) => {
2294 let Schema {
2295 key_schema,
2296 value_schema,
2297 csr_connection,
2298 confluent_wire_format,
2299 } = match schema {
2300 AvroSchema::InlineSchema {
2303 schema: ast::Schema { schema },
2304 with_options,
2305 } => {
2306 let AvroSchemaOptionExtracted {
2307 confluent_wire_format,
2308 ..
2309 } = with_options.clone().try_into()?;
2310
2311 Schema {
2312 key_schema: None,
2313 value_schema: schema.clone(),
2314 csr_connection: None,
2315 confluent_wire_format,
2316 }
2317 }
2318 AvroSchema::Csr {
2319 csr_connection:
2320 CsrConnectionAvro {
2321 connection,
2322 seed,
2323 key_strategy: _,
2324 value_strategy: _,
2325 },
2326 } => {
2327 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2328 let csr_connection = match item.connection()? {
2329 Connection::Csr(_) => item.id(),
2330 _ => {
2331 sql_bail!(
2332 "{} is not a schema registry connection",
2333 scx.catalog
2334 .resolve_full_name(item.name())
2335 .to_string()
2336 .quoted()
2337 )
2338 }
2339 };
2340
2341 if let Some(seed) = seed {
2342 Schema {
2343 key_schema: seed.key_schema.clone(),
2344 value_schema: seed.value_schema.clone(),
2345 csr_connection: Some(csr_connection),
2346 confluent_wire_format: true,
2347 }
2348 } else {
2349 unreachable!("CSR seed resolution should already have been called: Avro")
2350 }
2351 }
2352 };
2353
2354 if let Some(key_schema) = key_schema {
2355 return Ok(SourceDataEncoding {
2356 key: Some(DataEncoding::Avro(AvroEncoding {
2357 schema: key_schema,
2358 csr_connection: csr_connection.clone(),
2359 confluent_wire_format,
2360 })),
2361 value: DataEncoding::Avro(AvroEncoding {
2362 schema: value_schema,
2363 csr_connection,
2364 confluent_wire_format,
2365 }),
2366 });
2367 } else {
2368 DataEncoding::Avro(AvroEncoding {
2369 schema: value_schema,
2370 csr_connection,
2371 confluent_wire_format,
2372 })
2373 }
2374 }
2375 Format::Protobuf(schema) => match schema {
2376 ProtobufSchema::Csr {
2377 csr_connection:
2378 CsrConnectionProtobuf {
2379 connection:
2380 CsrConnection {
2381 connection,
2382 options,
2383 },
2384 seed,
2385 },
2386 } => {
2387 if let Some(CsrSeedProtobuf { key, value }) = seed {
2388 let item = scx.get_item_by_resolved_name(connection)?;
2389 let _ = match item.connection()? {
2390 Connection::Csr(connection) => connection,
2391 _ => {
2392 sql_bail!(
2393 "{} is not a schema registry connection",
2394 scx.catalog
2395 .resolve_full_name(item.name())
2396 .to_string()
2397 .quoted()
2398 )
2399 }
2400 };
2401
2402 if !options.is_empty() {
2403 sql_bail!("Protobuf CSR connections do not support any options");
2404 }
2405
2406 let value = DataEncoding::Protobuf(ProtobufEncoding {
2407 descriptors: strconv::parse_bytes(&value.schema)?,
2408 message_name: value.message_name.clone(),
2409 confluent_wire_format: true,
2410 });
2411 if let Some(key) = key {
2412 return Ok(SourceDataEncoding {
2413 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2414 descriptors: strconv::parse_bytes(&key.schema)?,
2415 message_name: key.message_name.clone(),
2416 confluent_wire_format: true,
2417 })),
2418 value,
2419 });
2420 }
2421 value
2422 } else {
2423 unreachable!("CSR seed resolution should already have been called: Proto")
2424 }
2425 }
2426 ProtobufSchema::InlineSchema {
2427 message_name,
2428 schema: ast::Schema { schema },
2429 } => {
2430 let descriptors = strconv::parse_bytes(schema)?;
2431
2432 DataEncoding::Protobuf(ProtobufEncoding {
2433 descriptors,
2434 message_name: message_name.to_owned(),
2435 confluent_wire_format: false,
2436 })
2437 }
2438 },
2439 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2440 regex: mz_repr::adt::regex::Regex::new(regex, false)
2441 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2442 }),
2443 Format::Csv { columns, delimiter } => {
2444 let columns = match columns {
2445 CsvColumns::Header { names } => {
2446 if names.is_empty() {
2447 sql_bail!("[internal error] column spec should get names in purify")
2448 }
2449 ColumnSpec::Header {
2450 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2451 }
2452 }
2453 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2454 };
2455 DataEncoding::Csv(CsvEncoding {
2456 columns,
2457 delimiter: u8::try_from(*delimiter)
2458 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2459 })
2460 }
2461 Format::Json { array: false } => DataEncoding::Json,
2462 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2463 Format::Text => DataEncoding::Text,
2464 };
2465 Ok(SourceDataEncoding { key: None, value })
2466}
2467
2468fn get_key_envelope(
2470 included_items: &[SourceIncludeMetadata],
2471 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2472 key_envelope_no_encoding: bool,
2473) -> Result<KeyEnvelope, PlanError> {
2474 let key_definition = included_items
2475 .iter()
2476 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2477 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2478 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2479 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2480 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2481 (Some(name), _) if key_envelope_no_encoding => {
2482 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2483 }
2484 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2485 (_, None) => {
2486 sql_bail!(
2490 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2491 got bare FORMAT"
2492 );
2493 }
2494 }
2495 } else {
2496 Ok(KeyEnvelope::None)
2497 }
2498}
2499
2500fn get_unnamed_key_envelope(
2503 key: Option<&DataEncoding<ReferencedConnection>>,
2504) -> Result<KeyEnvelope, PlanError> {
2505 let is_composite = match key {
2509 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2510 Some(
2511 DataEncoding::Avro(_)
2512 | DataEncoding::Csv(_)
2513 | DataEncoding::Protobuf(_)
2514 | DataEncoding::Regex { .. },
2515 ) => true,
2516 None => false,
2517 };
2518
2519 if is_composite {
2520 Ok(KeyEnvelope::Flattened)
2521 } else {
2522 Ok(KeyEnvelope::Named("key".to_string()))
2523 }
2524}
2525
2526pub fn describe_create_view(
2527 _: &StatementContext,
2528 _: CreateViewStatement<Aug>,
2529) -> Result<StatementDesc, PlanError> {
2530 Ok(StatementDesc::new(None))
2531}
2532
2533pub fn plan_view(
2534 scx: &StatementContext,
2535 def: &mut ViewDefinition<Aug>,
2536 temporary: bool,
2537) -> Result<(QualifiedItemName, View), PlanError> {
2538 let create_sql = normalize::create_statement(
2539 scx,
2540 Statement::CreateView(CreateViewStatement {
2541 if_exists: IfExistsBehavior::Error,
2542 temporary,
2543 definition: def.clone(),
2544 }),
2545 )?;
2546
2547 let ViewDefinition {
2548 name,
2549 columns,
2550 query,
2551 } = def;
2552
2553 let query::PlannedRootQuery {
2554 expr,
2555 mut desc,
2556 finishing,
2557 scope: _,
2558 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2559 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2565 &finishing,
2566 expr.arity()
2567 ));
2568 if expr.contains_parameters()? {
2569 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2570 }
2571
2572 let dependencies = expr
2573 .depends_on()
2574 .into_iter()
2575 .map(|gid| scx.catalog.resolve_item_id(&gid))
2576 .collect();
2577
2578 let name = if temporary {
2579 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2580 } else {
2581 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2582 };
2583
2584 plan_utils::maybe_rename_columns(
2585 format!("view {}", scx.catalog.resolve_full_name(&name)),
2586 &mut desc,
2587 columns,
2588 )?;
2589 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2590
2591 if let Some(dup) = names.iter().duplicates().next() {
2592 sql_bail!("column {} specified more than once", dup.quoted());
2593 }
2594
2595 let view = View {
2596 create_sql,
2597 expr,
2598 dependencies,
2599 column_names: names,
2600 temporary,
2601 };
2602
2603 Ok((name, view))
2604}
2605
2606pub fn plan_create_view(
2607 scx: &StatementContext,
2608 mut stmt: CreateViewStatement<Aug>,
2609) -> Result<Plan, PlanError> {
2610 let CreateViewStatement {
2611 temporary,
2612 if_exists,
2613 definition,
2614 } = &mut stmt;
2615 let (name, view) = plan_view(scx, definition, *temporary)?;
2616
2617 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2620
2621 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2622 let if_exists = true;
2623 let cascade = false;
2624 let maybe_item_to_drop = plan_drop_item(
2625 scx,
2626 ObjectType::View,
2627 if_exists,
2628 definition.name.clone(),
2629 cascade,
2630 )?;
2631
2632 if let Some(id) = maybe_item_to_drop {
2634 let dependencies = view.expr.depends_on();
2635 let invalid_drop = scx
2636 .get_item(&id)
2637 .global_ids()
2638 .any(|gid| dependencies.contains(&gid));
2639 if invalid_drop {
2640 let item = scx.catalog.get_item(&id);
2641 sql_bail!(
2642 "cannot replace view {0}: depended upon by new {0} definition",
2643 scx.catalog.resolve_full_name(item.name())
2644 );
2645 }
2646
2647 Some(id)
2648 } else {
2649 None
2650 }
2651 } else {
2652 None
2653 };
2654 let drop_ids = replace
2655 .map(|id| {
2656 scx.catalog
2657 .item_dependents(id)
2658 .into_iter()
2659 .map(|id| id.unwrap_item_id())
2660 .collect()
2661 })
2662 .unwrap_or_default();
2663
2664 let full_name = scx.catalog.resolve_full_name(&name);
2666 let partial_name = PartialItemName::from(full_name.clone());
2667 if let (Ok(item), IfExistsBehavior::Error, false) = (
2670 scx.catalog.resolve_item_or_type(&partial_name),
2671 *if_exists,
2672 ignore_if_exists_errors,
2673 ) {
2674 return Err(PlanError::ItemAlreadyExists {
2675 name: full_name.to_string(),
2676 item_type: item.item_type(),
2677 });
2678 }
2679
2680 Ok(Plan::CreateView(CreateViewPlan {
2681 name,
2682 view,
2683 replace,
2684 drop_ids,
2685 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2686 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2687 }))
2688}
2689
2690pub fn describe_create_materialized_view(
2691 _: &StatementContext,
2692 _: CreateMaterializedViewStatement<Aug>,
2693) -> Result<StatementDesc, PlanError> {
2694 Ok(StatementDesc::new(None))
2695}
2696
2697pub fn describe_create_continual_task(
2698 _: &StatementContext,
2699 _: CreateContinualTaskStatement<Aug>,
2700) -> Result<StatementDesc, PlanError> {
2701 Ok(StatementDesc::new(None))
2702}
2703
2704pub fn describe_create_network_policy(
2705 _: &StatementContext,
2706 _: CreateNetworkPolicyStatement<Aug>,
2707) -> Result<StatementDesc, PlanError> {
2708 Ok(StatementDesc::new(None))
2709}
2710
2711pub fn describe_alter_network_policy(
2712 _: &StatementContext,
2713 _: AlterNetworkPolicyStatement<Aug>,
2714) -> Result<StatementDesc, PlanError> {
2715 Ok(StatementDesc::new(None))
2716}
2717
2718pub fn plan_create_materialized_view(
2719 scx: &StatementContext,
2720 mut stmt: CreateMaterializedViewStatement<Aug>,
2721) -> Result<Plan, PlanError> {
2722 let cluster_id =
2723 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2724 stmt.in_cluster = Some(ResolvedClusterName {
2725 id: cluster_id,
2726 print_name: None,
2727 });
2728
2729 let create_sql =
2730 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2731
2732 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2733 let name = scx.allocate_qualified_name(partial_name.clone())?;
2734
2735 let query::PlannedRootQuery {
2736 expr,
2737 mut desc,
2738 finishing,
2739 scope: _,
2740 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2741 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2743 &finishing,
2744 expr.arity()
2745 ));
2746 if expr.contains_parameters()? {
2747 return Err(PlanError::ParameterNotAllowed(
2748 "materialized views".to_string(),
2749 ));
2750 }
2751
2752 plan_utils::maybe_rename_columns(
2753 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2754 &mut desc,
2755 &stmt.columns,
2756 )?;
2757 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2758
2759 let MaterializedViewOptionExtracted {
2760 assert_not_null,
2761 partition_by,
2762 retain_history,
2763 refresh,
2764 seen: _,
2765 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2766
2767 if let Some(partition_by) = partition_by {
2768 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2769 check_partition_by(&desc, partition_by)?;
2770 }
2771
2772 let refresh_schedule = {
2773 let mut refresh_schedule = RefreshSchedule::default();
2774 let mut on_commits_seen = 0;
2775 for refresh_option_value in refresh {
2776 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2777 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2778 }
2779 match refresh_option_value {
2780 RefreshOptionValue::OnCommit => {
2781 on_commits_seen += 1;
2782 }
2783 RefreshOptionValue::AtCreation => {
2784 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2785 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2786 }
2787 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2788 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2790 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2791 name: "REFRESH AT",
2792 scope: &Scope::empty(),
2793 relation_type: &SqlRelationType::empty(),
2794 allow_aggregates: false,
2795 allow_subqueries: false,
2796 allow_parameters: false,
2797 allow_windows: false,
2798 };
2799 let hir = plan_expr(ecx, &time)?.cast_to(
2800 ecx,
2801 CastContext::Assignment,
2802 &SqlScalarType::MzTimestamp,
2803 )?;
2804 let timestamp = hir
2806 .into_literal_mz_timestamp()
2807 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2808 refresh_schedule.ats.push(timestamp);
2809 }
2810 RefreshOptionValue::Every(RefreshEveryOptionValue {
2811 interval,
2812 aligned_to,
2813 }) => {
2814 let interval = Interval::try_from_value(Value::Interval(interval))?;
2815 if interval.as_microseconds() <= 0 {
2816 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2817 }
2818 if interval.months != 0 {
2819 sql_bail!("REFRESH interval must not involve units larger than days");
2824 }
2825 let interval = interval.duration()?;
2826 if u64::try_from(interval.as_millis()).is_err() {
2827 sql_bail!("REFRESH interval too large");
2828 }
2829
2830 let mut aligned_to = match aligned_to {
2831 Some(aligned_to) => aligned_to,
2832 None => {
2833 soft_panic_or_log!(
2834 "ALIGNED TO should have been filled in by purification"
2835 );
2836 sql_bail!(
2837 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2838 )
2839 }
2840 };
2841
2842 transform_ast::transform(scx, &mut aligned_to)?;
2844
2845 let ecx = &ExprContext {
2846 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2847 name: "REFRESH EVERY ... ALIGNED TO",
2848 scope: &Scope::empty(),
2849 relation_type: &SqlRelationType::empty(),
2850 allow_aggregates: false,
2851 allow_subqueries: false,
2852 allow_parameters: false,
2853 allow_windows: false,
2854 };
2855 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2856 ecx,
2857 CastContext::Assignment,
2858 &SqlScalarType::MzTimestamp,
2859 )?;
2860 let aligned_to_const = aligned_to_hir
2862 .into_literal_mz_timestamp()
2863 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2864
2865 refresh_schedule.everies.push(RefreshEvery {
2866 interval,
2867 aligned_to: aligned_to_const,
2868 });
2869 }
2870 }
2871 }
2872
2873 if on_commits_seen > 1 {
2874 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2875 }
2876 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2877 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2878 }
2879
2880 if refresh_schedule == RefreshSchedule::default() {
2881 None
2882 } else {
2883 Some(refresh_schedule)
2884 }
2885 };
2886
2887 let as_of = stmt.as_of.map(Timestamp::from);
2888 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2889 let mut non_null_assertions = assert_not_null
2890 .into_iter()
2891 .map(normalize::column_name)
2892 .map(|assertion_name| {
2893 column_names
2894 .iter()
2895 .position(|col| col == &assertion_name)
2896 .ok_or_else(|| {
2897 sql_err!(
2898 "column {} in ASSERT NOT NULL option not found",
2899 assertion_name.quoted()
2900 )
2901 })
2902 })
2903 .collect::<Result<Vec<_>, _>>()?;
2904 non_null_assertions.sort();
2905 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2906 let dup = &column_names[*dup];
2907 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2908 }
2909
2910 if let Some(dup) = column_names.iter().duplicates().next() {
2911 sql_bail!("column {} specified more than once", dup.quoted());
2912 }
2913
2914 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2917 Ok(true) => IfExistsBehavior::Skip,
2918 _ => stmt.if_exists,
2919 };
2920
2921 let mut replace = None;
2922 let mut if_not_exists = false;
2923 match if_exists {
2924 IfExistsBehavior::Replace => {
2925 let if_exists = true;
2926 let cascade = false;
2927 let replace_id = plan_drop_item(
2928 scx,
2929 ObjectType::MaterializedView,
2930 if_exists,
2931 partial_name.into(),
2932 cascade,
2933 )?;
2934
2935 if let Some(id) = replace_id {
2937 let dependencies = expr.depends_on();
2938 let invalid_drop = scx
2939 .get_item(&id)
2940 .global_ids()
2941 .any(|gid| dependencies.contains(&gid));
2942 if invalid_drop {
2943 let item = scx.catalog.get_item(&id);
2944 sql_bail!(
2945 "cannot replace materialized view {0}: depended upon by new {0} definition",
2946 scx.catalog.resolve_full_name(item.name())
2947 );
2948 }
2949 replace = Some(id);
2950 }
2951 }
2952 IfExistsBehavior::Skip => if_not_exists = true,
2953 IfExistsBehavior::Error => (),
2954 }
2955 let drop_ids = replace
2956 .map(|id| {
2957 scx.catalog
2958 .item_dependents(id)
2959 .into_iter()
2960 .map(|id| id.unwrap_item_id())
2961 .collect()
2962 })
2963 .unwrap_or_default();
2964 let dependencies = expr
2965 .depends_on()
2966 .into_iter()
2967 .map(|gid| scx.catalog.resolve_item_id(&gid))
2968 .collect();
2969
2970 let full_name = scx.catalog.resolve_full_name(&name);
2972 let partial_name = PartialItemName::from(full_name.clone());
2973 if let (IfExistsBehavior::Error, Ok(item)) =
2976 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2977 {
2978 return Err(PlanError::ItemAlreadyExists {
2979 name: full_name.to_string(),
2980 item_type: item.item_type(),
2981 });
2982 }
2983
2984 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2985 name,
2986 materialized_view: MaterializedView {
2987 create_sql,
2988 expr,
2989 dependencies,
2990 column_names,
2991 cluster_id,
2992 non_null_assertions,
2993 compaction_window,
2994 refresh_schedule,
2995 as_of,
2996 },
2997 replace,
2998 drop_ids,
2999 if_not_exists,
3000 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3001 }))
3002}
3003
3004generate_extracted_config!(
3005 MaterializedViewOption,
3006 (AssertNotNull, Ident, AllowMultiple),
3007 (PartitionBy, Vec<Ident>),
3008 (RetainHistory, OptionalDuration),
3009 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3010);
3011
3012pub fn plan_create_continual_task(
3013 scx: &StatementContext,
3014 mut stmt: CreateContinualTaskStatement<Aug>,
3015) -> Result<Plan, PlanError> {
3016 match &stmt.sugar {
3017 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3018 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3019 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3020 }
3021 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3022 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3023 }
3024 };
3025 let cluster_id = match &stmt.in_cluster {
3026 None => scx.catalog.resolve_cluster(None)?.id(),
3027 Some(in_cluster) => in_cluster.id,
3028 };
3029 stmt.in_cluster = Some(ResolvedClusterName {
3030 id: cluster_id,
3031 print_name: None,
3032 });
3033
3034 let create_sql =
3035 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3036
3037 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3038
3039 let mut desc = match stmt.columns {
3044 None => None,
3045 Some(columns) => {
3046 let mut desc_columns = Vec::with_capacity(columns.capacity());
3047 for col in columns.iter() {
3048 desc_columns.push((
3049 normalize::column_name(col.name.clone()),
3050 SqlColumnType {
3051 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3052 nullable: false,
3053 },
3054 ));
3055 }
3056 Some(RelationDesc::from_names_and_types(desc_columns))
3057 }
3058 };
3059 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3060 match input.item_type() {
3061 CatalogItemType::ContinualTask
3064 | CatalogItemType::Table
3065 | CatalogItemType::MaterializedView
3066 | CatalogItemType::Source => {}
3067 CatalogItemType::Sink
3068 | CatalogItemType::View
3069 | CatalogItemType::Index
3070 | CatalogItemType::Type
3071 | CatalogItemType::Func
3072 | CatalogItemType::Secret
3073 | CatalogItemType::Connection => {
3074 sql_bail!(
3075 "CONTINUAL TASK cannot use {} as an input",
3076 input.item_type()
3077 );
3078 }
3079 }
3080
3081 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3082 let ct_name = stmt.name;
3083 let placeholder_id = match &ct_name {
3084 ResolvedItemName::ContinualTask { id, name } => {
3085 let desc = match desc.as_ref().cloned() {
3086 Some(x) => x,
3087 None => {
3088 let input_name = scx.catalog.resolve_full_name(input.name());
3093 input.desc(&input_name)?.into_owned()
3094 }
3095 };
3096 qcx.ctes.insert(
3097 *id,
3098 CteDesc {
3099 name: name.item.clone(),
3100 desc,
3101 },
3102 );
3103 Some(*id)
3104 }
3105 _ => None,
3106 };
3107
3108 let mut exprs = Vec::new();
3109 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3110 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3111 let query::PlannedRootQuery {
3112 expr,
3113 desc: desc_query,
3114 finishing,
3115 scope: _,
3116 } = query::plan_ct_query(&mut qcx, query)?;
3117 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3120 &finishing,
3121 expr.arity()
3122 ));
3123 if expr.contains_parameters()? {
3124 if expr.contains_parameters()? {
3125 return Err(PlanError::ParameterNotAllowed(
3126 "continual tasks".to_string(),
3127 ));
3128 }
3129 }
3130 let expr = match desc.as_mut() {
3131 None => {
3132 desc = Some(desc_query);
3133 expr
3134 }
3135 Some(desc) => {
3136 if desc_query.arity() > desc.arity() {
3139 sql_bail!(
3140 "statement {}: INSERT has more expressions than target columns",
3141 idx
3142 );
3143 }
3144 if desc_query.arity() < desc.arity() {
3145 sql_bail!(
3146 "statement {}: INSERT has more target columns than expressions",
3147 idx
3148 );
3149 }
3150 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3153 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3154 let expr = expr.map_err(|e| {
3155 sql_err!(
3156 "statement {}: column {} is of type {} but expression is of type {}",
3157 idx,
3158 desc.get_name(e.column).quoted(),
3159 qcx.humanize_scalar_type(&e.target_type, false),
3160 qcx.humanize_scalar_type(&e.source_type, false),
3161 )
3162 })?;
3163
3164 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3167 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3168 if updated {
3169 let new_types = zip_types().map(|(ct, q)| {
3170 let mut ct = ct.clone();
3171 if q.nullable {
3172 ct.nullable = true;
3173 }
3174 ct
3175 });
3176 *desc = RelationDesc::from_names_and_types(
3177 desc.iter_names().cloned().zip_eq(new_types),
3178 );
3179 }
3180
3181 expr
3182 }
3183 };
3184 match stmt {
3185 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3186 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3187 }
3188 }
3189 let expr = exprs
3192 .into_iter()
3193 .reduce(|acc, expr| acc.union(expr))
3194 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3195 let dependencies = expr
3196 .depends_on()
3197 .into_iter()
3198 .map(|gid| scx.catalog.resolve_item_id(&gid))
3199 .collect();
3200
3201 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3202 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3203 if let Some(dup) = column_names.iter().duplicates().next() {
3204 sql_bail!("column {} specified more than once", dup.quoted());
3205 }
3206
3207 let name = match &ct_name {
3209 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3210 ResolvedItemName::ContinualTask { name, .. } => {
3211 let name = scx.allocate_qualified_name(name.clone())?;
3212 let full_name = scx.catalog.resolve_full_name(&name);
3213 let partial_name = PartialItemName::from(full_name.clone());
3214 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3217 return Err(PlanError::ItemAlreadyExists {
3218 name: full_name.to_string(),
3219 item_type: item.item_type(),
3220 });
3221 }
3222 name
3223 }
3224 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3225 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3226 };
3227
3228 let as_of = stmt.as_of.map(Timestamp::from);
3229 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3230 name,
3231 placeholder_id,
3232 desc,
3233 input_id: input.global_id(),
3234 with_snapshot: snapshot.unwrap_or(true),
3235 continual_task: MaterializedView {
3236 create_sql,
3237 expr,
3238 dependencies,
3239 column_names,
3240 cluster_id,
3241 non_null_assertions: Vec::new(),
3242 compaction_window: None,
3243 refresh_schedule: None,
3244 as_of,
3245 },
3246 }))
3247}
3248
3249fn continual_task_query<'a>(
3250 ct_name: &ResolvedItemName,
3251 stmt: &'a ast::ContinualTaskStmt<Aug>,
3252) -> Option<ast::Query<Aug>> {
3253 match stmt {
3254 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3255 table_name: _,
3256 columns,
3257 source,
3258 returning,
3259 }) => {
3260 if !columns.is_empty() || !returning.is_empty() {
3261 return None;
3262 }
3263 match source {
3264 ast::InsertSource::Query(query) => Some(query.clone()),
3265 ast::InsertSource::DefaultValues => None,
3266 }
3267 }
3268 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3269 table_name: _,
3270 alias,
3271 using,
3272 selection,
3273 }) => {
3274 if !using.is_empty() {
3275 return None;
3276 }
3277 let from = ast::TableWithJoins {
3279 relation: ast::TableFactor::Table {
3280 name: ct_name.clone(),
3281 alias: alias.clone(),
3282 },
3283 joins: Vec::new(),
3284 };
3285 let select = ast::Select {
3286 from: vec![from],
3287 selection: selection.clone(),
3288 distinct: None,
3289 projection: vec![ast::SelectItem::Wildcard],
3290 group_by: Vec::new(),
3291 having: None,
3292 qualify: None,
3293 options: Vec::new(),
3294 };
3295 let query = ast::Query {
3296 ctes: ast::CteBlock::Simple(Vec::new()),
3297 body: ast::SetExpr::Select(Box::new(select)),
3298 order_by: Vec::new(),
3299 limit: None,
3300 offset: None,
3301 };
3302 Some(query)
3304 }
3305 }
3306}
3307
3308generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3309
3310pub fn describe_create_sink(
3311 _: &StatementContext,
3312 _: CreateSinkStatement<Aug>,
3313) -> Result<StatementDesc, PlanError> {
3314 Ok(StatementDesc::new(None))
3315}
3316
3317generate_extracted_config!(
3318 CreateSinkOption,
3319 (Snapshot, bool),
3320 (PartitionStrategy, String),
3321 (Version, u64),
3322 (CommitInterval, Duration)
3323);
3324
3325pub fn plan_create_sink(
3326 scx: &StatementContext,
3327 stmt: CreateSinkStatement<Aug>,
3328) -> Result<Plan, PlanError> {
3329 let Some(name) = stmt.name.clone() else {
3331 return Err(PlanError::MissingName(CatalogItemType::Sink));
3332 };
3333 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3334 let full_name = scx.catalog.resolve_full_name(&name);
3335 let partial_name = PartialItemName::from(full_name.clone());
3336 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3337 return Err(PlanError::ItemAlreadyExists {
3338 name: full_name.to_string(),
3339 item_type: item.item_type(),
3340 });
3341 }
3342
3343 plan_sink(scx, stmt)
3344}
3345
3346fn plan_sink(
3351 scx: &StatementContext,
3352 mut stmt: CreateSinkStatement<Aug>,
3353) -> Result<Plan, PlanError> {
3354 let CreateSinkStatement {
3355 name,
3356 in_cluster: _,
3357 from,
3358 connection,
3359 format,
3360 envelope,
3361 if_not_exists,
3362 with_options,
3363 } = stmt.clone();
3364
3365 let Some(name) = name else {
3366 return Err(PlanError::MissingName(CatalogItemType::Sink));
3367 };
3368 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3369
3370 let envelope = match envelope {
3371 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3372 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3373 None => sql_bail!("ENVELOPE clause is required"),
3374 };
3375
3376 let from_name = &from;
3377 let from = scx.get_item_by_resolved_name(&from)?;
3378 if from.id().is_system() {
3379 bail_unsupported!("creating a sink directly on a catalog object");
3380 }
3381 let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3382 let key_indices = match &connection {
3383 CreateSinkConnection::Kafka { key: Some(key), .. }
3384 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3385 let key_columns = key
3386 .key_columns
3387 .clone()
3388 .into_iter()
3389 .map(normalize::column_name)
3390 .collect::<Vec<_>>();
3391 let mut uniq = BTreeSet::new();
3392 for col in key_columns.iter() {
3393 if !uniq.insert(col) {
3394 sql_bail!("duplicate column referenced in KEY: {}", col);
3395 }
3396 }
3397 let indices = key_columns
3398 .iter()
3399 .map(|col| -> anyhow::Result<usize> {
3400 let name_idx =
3401 desc.get_by_name(col)
3402 .map(|(idx, _type)| idx)
3403 .ok_or_else(|| {
3404 sql_err!("column referenced in KEY does not exist: {}", col)
3405 })?;
3406 if desc.get_unambiguous_name(name_idx).is_none() {
3407 sql_err!("column referenced in KEY is ambiguous: {}", col);
3408 }
3409 Ok(name_idx)
3410 })
3411 .collect::<Result<Vec<_>, _>>()?;
3412 let is_valid_key = desc
3413 .typ()
3414 .keys
3415 .iter()
3416 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3417
3418 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3419 if key.not_enforced {
3420 scx.catalog
3421 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3422 key: key_columns.clone(),
3423 name: name.item.clone(),
3424 })
3425 } else {
3426 return Err(PlanError::UpsertSinkWithInvalidKey {
3427 name: from_name.full_name_str(),
3428 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3429 valid_keys: desc
3430 .typ()
3431 .keys
3432 .iter()
3433 .map(|key| {
3434 key.iter()
3435 .map(|col| desc.get_name(*col).as_str().into())
3436 .collect()
3437 })
3438 .collect(),
3439 });
3440 }
3441 }
3442 Some(indices)
3443 }
3444 CreateSinkConnection::Kafka { key: None, .. }
3445 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3446 };
3447
3448 let headers_index = match &connection {
3449 CreateSinkConnection::Kafka {
3450 headers: Some(headers),
3451 ..
3452 } => {
3453 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3454
3455 match envelope {
3456 SinkEnvelope::Upsert => (),
3457 SinkEnvelope::Debezium => {
3458 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3459 }
3460 };
3461
3462 let headers = normalize::column_name(headers.clone());
3463 let (idx, ty) = desc
3464 .get_by_name(&headers)
3465 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3466
3467 if desc.get_unambiguous_name(idx).is_none() {
3468 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3469 }
3470
3471 match &ty.scalar_type {
3472 SqlScalarType::Map { value_type, .. }
3473 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3474 _ => sql_bail!(
3475 "HEADERS column must have type map[text => text] or map[text => bytea]"
3476 ),
3477 }
3478
3479 Some(idx)
3480 }
3481 _ => None,
3482 };
3483
3484 let relation_key_indices = desc.typ().keys.get(0).cloned();
3486
3487 let key_desc_and_indices = key_indices.map(|key_indices| {
3488 let cols = desc
3489 .iter()
3490 .map(|(name, ty)| (name.clone(), ty.clone()))
3491 .collect::<Vec<_>>();
3492 let (names, types): (Vec<_>, Vec<_>) =
3493 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3494 let typ = SqlRelationType::new(types);
3495 (RelationDesc::new(typ, names), key_indices)
3496 });
3497
3498 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3499 return Err(PlanError::UpsertSinkWithoutKey);
3500 }
3501
3502 let connection_builder = match connection {
3503 CreateSinkConnection::Kafka {
3504 connection,
3505 options,
3506 ..
3507 } => kafka_sink_builder(
3508 scx,
3509 connection,
3510 options,
3511 format,
3512 relation_key_indices,
3513 key_desc_and_indices,
3514 headers_index,
3515 desc.into_owned(),
3516 envelope,
3517 from.id(),
3518 )?,
3519 CreateSinkConnection::Iceberg {
3520 connection,
3521 aws_connection,
3522 options,
3523 ..
3524 } => iceberg_sink_builder(
3525 scx,
3526 connection,
3527 aws_connection,
3528 options,
3529 relation_key_indices,
3530 key_desc_and_indices,
3531 )?,
3532 };
3533
3534 let CreateSinkOptionExtracted {
3535 snapshot,
3536 version,
3537 partition_strategy: _,
3538 seen: _,
3539 commit_interval: _,
3540 } = with_options.try_into()?;
3541
3542 let with_snapshot = snapshot.unwrap_or(true);
3544 let version = version.unwrap_or(0);
3546
3547 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3551 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3552
3553 Ok(Plan::CreateSink(CreateSinkPlan {
3554 name,
3555 sink: Sink {
3556 create_sql,
3557 from: from.global_id(),
3558 connection: connection_builder,
3559 envelope,
3560 version,
3561 },
3562 with_snapshot,
3563 if_not_exists,
3564 in_cluster: in_cluster.id(),
3565 }))
3566}
3567
3568fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3569 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3570
3571 let existing_keys = desc
3572 .typ()
3573 .keys
3574 .iter()
3575 .map(|key_columns| {
3576 key_columns
3577 .iter()
3578 .map(|col| desc.get_name(*col).as_str())
3579 .join(", ")
3580 })
3581 .join(", ");
3582
3583 sql_err!(
3584 "Key constraint ({}) conflicts with existing key ({})",
3585 user_keys,
3586 existing_keys
3587 )
3588}
3589
3590#[derive(Debug, Default, PartialEq, Clone)]
3593pub struct CsrConfigOptionExtracted {
3594 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3595 pub(crate) avro_key_fullname: Option<String>,
3596 pub(crate) avro_value_fullname: Option<String>,
3597 pub(crate) null_defaults: bool,
3598 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3599 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3600 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3601 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3602}
3603
3604impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3605 type Error = crate::plan::PlanError;
3606 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3607 let mut extracted = CsrConfigOptionExtracted::default();
3608 let mut common_doc_comments = BTreeMap::new();
3609 for option in v {
3610 if !extracted.seen.insert(option.name.clone()) {
3611 return Err(PlanError::Unstructured({
3612 format!("{} specified more than once", option.name)
3613 }));
3614 }
3615 let option_name = option.name.clone();
3616 let option_name_str = option_name.to_ast_string_simple();
3617 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3618 option_name: option_name.to_ast_string_simple(),
3619 err: e.into(),
3620 };
3621 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3622 val.map(|s| match s {
3623 WithOptionValue::Value(Value::String(s)) => {
3624 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3625 }
3626 _ => Err("must be a string".to_string()),
3627 })
3628 .transpose()
3629 .map_err(PlanError::Unstructured)
3630 .map_err(better_error)
3631 };
3632 match option.name {
3633 CsrConfigOptionName::AvroKeyFullname => {
3634 extracted.avro_key_fullname =
3635 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3636 }
3637 CsrConfigOptionName::AvroValueFullname => {
3638 extracted.avro_value_fullname =
3639 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3640 }
3641 CsrConfigOptionName::NullDefaults => {
3642 extracted.null_defaults =
3643 <bool>::try_from_value(option.value).map_err(better_error)?;
3644 }
3645 CsrConfigOptionName::AvroDocOn(doc_on) => {
3646 let value = String::try_from_value(option.value.ok_or_else(|| {
3647 PlanError::InvalidOptionValue {
3648 option_name: option_name_str,
3649 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3650 }
3651 })?)
3652 .map_err(better_error)?;
3653 let key = match doc_on.identifier {
3654 DocOnIdentifier::Column(ast::ColumnName {
3655 relation: ResolvedItemName::Item { id, .. },
3656 column: ResolvedColumnReference::Column { name, index: _ },
3657 }) => DocTarget::Field {
3658 object_id: id,
3659 column_name: name,
3660 },
3661 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3662 DocTarget::Type(id)
3663 }
3664 _ => unreachable!(),
3665 };
3666
3667 match doc_on.for_schema {
3668 DocOnSchema::KeyOnly => {
3669 extracted.key_doc_options.insert(key, value);
3670 }
3671 DocOnSchema::ValueOnly => {
3672 extracted.value_doc_options.insert(key, value);
3673 }
3674 DocOnSchema::All => {
3675 common_doc_comments.insert(key, value);
3676 }
3677 }
3678 }
3679 CsrConfigOptionName::KeyCompatibilityLevel => {
3680 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3681 }
3682 CsrConfigOptionName::ValueCompatibilityLevel => {
3683 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3684 }
3685 }
3686 }
3687
3688 for (key, value) in common_doc_comments {
3689 if !extracted.key_doc_options.contains_key(&key) {
3690 extracted.key_doc_options.insert(key.clone(), value.clone());
3691 }
3692 if !extracted.value_doc_options.contains_key(&key) {
3693 extracted.value_doc_options.insert(key, value);
3694 }
3695 }
3696 Ok(extracted)
3697 }
3698}
3699
3700fn iceberg_sink_builder(
3701 scx: &StatementContext,
3702 catalog_connection: ResolvedItemName,
3703 aws_connection: ResolvedItemName,
3704 options: Vec<IcebergSinkConfigOption<Aug>>,
3705 relation_key_indices: Option<Vec<usize>>,
3706 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3707) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3708 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3709 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3710 let catalog_connection_id = catalog_connection_item.id();
3711 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3712 let aws_connection_id = aws_connection_item.id();
3713 if !matches!(
3714 catalog_connection_item.connection()?,
3715 Connection::IcebergCatalog(_)
3716 ) {
3717 sql_bail!(
3718 "{} is not an iceberg catalog connection",
3719 scx.catalog
3720 .resolve_full_name(catalog_connection_item.name())
3721 .to_string()
3722 .quoted()
3723 );
3724 };
3725
3726 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3727 sql_bail!(
3728 "{} is not an AWS connection",
3729 scx.catalog
3730 .resolve_full_name(aws_connection_item.name())
3731 .to_string()
3732 .quoted()
3733 );
3734 }
3735
3736 let IcebergSinkConfigOptionExtracted {
3737 table,
3738 namespace,
3739 seen: _,
3740 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3741
3742 let Some(table) = table else {
3743 sql_bail!("Iceberg sink must specify TABLE");
3744 };
3745 let Some(namespace) = namespace else {
3746 sql_bail!("Iceberg sink must specify NAMESPACE");
3747 };
3748
3749 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3750 catalog_connection_id,
3751 catalog_connection: catalog_connection_id,
3752 aws_connection_id,
3753 aws_connection: aws_connection_id,
3754 table,
3755 namespace,
3756 relation_key_indices,
3757 key_desc_and_indices,
3758 }))
3759}
3760
3761fn kafka_sink_builder(
3762 scx: &StatementContext,
3763 connection: ResolvedItemName,
3764 options: Vec<KafkaSinkConfigOption<Aug>>,
3765 format: Option<FormatSpecifier<Aug>>,
3766 relation_key_indices: Option<Vec<usize>>,
3767 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3768 headers_index: Option<usize>,
3769 value_desc: RelationDesc,
3770 envelope: SinkEnvelope,
3771 sink_from: CatalogItemId,
3772) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3773 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3775 let connection_id = connection_item.id();
3776 match connection_item.connection()? {
3777 Connection::Kafka(_) => (),
3778 _ => sql_bail!(
3779 "{} is not a kafka connection",
3780 scx.catalog.resolve_full_name(connection_item.name())
3781 ),
3782 };
3783
3784 let KafkaSinkConfigOptionExtracted {
3785 topic,
3786 compression_type,
3787 partition_by,
3788 progress_group_id_prefix,
3789 transactional_id_prefix,
3790 legacy_ids,
3791 topic_config,
3792 topic_metadata_refresh_interval,
3793 topic_partition_count,
3794 topic_replication_factor,
3795 seen: _,
3796 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3797
3798 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3799 (Some(_), Some(true)) => {
3800 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3801 }
3802 (None, Some(true)) => KafkaIdStyle::Legacy,
3803 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3804 };
3805
3806 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3807 (Some(_), Some(true)) => {
3808 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3809 }
3810 (None, Some(true)) => KafkaIdStyle::Legacy,
3811 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3812 };
3813
3814 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3815
3816 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3817 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3820 }
3821
3822 let assert_positive = |val: Option<i32>, name: &str| {
3823 if let Some(val) = val {
3824 if val <= 0 {
3825 sql_bail!("{} must be a positive integer", name);
3826 }
3827 }
3828 val.map(NonNeg::try_from)
3829 .transpose()
3830 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3831 };
3832 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3833 let topic_replication_factor =
3834 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3835
3836 let gen_avro_schema_options = |conn| {
3839 let CsrConnectionAvro {
3840 connection:
3841 CsrConnection {
3842 connection,
3843 options,
3844 },
3845 seed,
3846 key_strategy,
3847 value_strategy,
3848 } = conn;
3849 if seed.is_some() {
3850 sql_bail!("SEED option does not make sense with sinks");
3851 }
3852 if key_strategy.is_some() {
3853 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3854 }
3855 if value_strategy.is_some() {
3856 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3857 }
3858
3859 let item = scx.get_item_by_resolved_name(&connection)?;
3860 let csr_connection = match item.connection()? {
3861 Connection::Csr(_) => item.id(),
3862 _ => {
3863 sql_bail!(
3864 "{} is not a schema registry connection",
3865 scx.catalog
3866 .resolve_full_name(item.name())
3867 .to_string()
3868 .quoted()
3869 )
3870 }
3871 };
3872 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3873
3874 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3875 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3876 }
3877
3878 if key_desc_and_indices.is_some()
3879 && (extracted_options.avro_key_fullname.is_some()
3880 ^ extracted_options.avro_value_fullname.is_some())
3881 {
3882 sql_bail!(
3883 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3884 );
3885 }
3886
3887 Ok((csr_connection, extracted_options))
3888 };
3889
3890 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3891 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3892 Format::Bytes if desc.arity() == 1 => {
3893 let col_type = &desc.typ().column_types[0].scalar_type;
3894 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3895 bail_unsupported!(format!(
3896 "BYTES format with non-encodable type: {:?}",
3897 col_type
3898 ));
3899 }
3900
3901 Ok(KafkaSinkFormatType::Bytes)
3902 }
3903 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3904 Format::Bytes | Format::Text => {
3905 bail_unsupported!("BYTES or TEXT format with multiple columns")
3906 }
3907 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3908 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3909 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3910 let schema = if is_key {
3911 AvroSchemaGenerator::new(
3912 desc.clone(),
3913 false,
3914 options.key_doc_options,
3915 options.avro_key_fullname.as_deref().unwrap_or("row"),
3916 options.null_defaults,
3917 Some(sink_from),
3918 false,
3919 )?
3920 .schema()
3921 .to_string()
3922 } else {
3923 AvroSchemaGenerator::new(
3924 desc.clone(),
3925 matches!(envelope, SinkEnvelope::Debezium),
3926 options.value_doc_options,
3927 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3928 options.null_defaults,
3929 Some(sink_from),
3930 true,
3931 )?
3932 .schema()
3933 .to_string()
3934 };
3935 Ok(KafkaSinkFormatType::Avro {
3936 schema,
3937 compatibility_level: if is_key {
3938 options.key_compatibility_level
3939 } else {
3940 options.value_compatibility_level
3941 },
3942 csr_connection,
3943 })
3944 }
3945 format => bail_unsupported!(format!("sink format {:?}", format)),
3946 };
3947
3948 let partition_by = match &partition_by {
3949 Some(partition_by) => {
3950 let mut scope = Scope::from_source(None, value_desc.iter_names());
3951
3952 match envelope {
3953 SinkEnvelope::Upsert => (),
3954 SinkEnvelope::Debezium => {
3955 let key_indices: HashSet<_> = key_desc_and_indices
3956 .as_ref()
3957 .map(|(_desc, indices)| indices.as_slice())
3958 .unwrap_or_default()
3959 .into_iter()
3960 .collect();
3961 for (i, item) in scope.items.iter_mut().enumerate() {
3962 if !key_indices.contains(&i) {
3963 item.error_if_referenced = Some(|_table, column| {
3964 PlanError::InvalidPartitionByEnvelopeDebezium {
3965 column_name: column.to_string(),
3966 }
3967 });
3968 }
3969 }
3970 }
3971 };
3972
3973 let ecx = &ExprContext {
3974 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3975 name: "PARTITION BY",
3976 scope: &scope,
3977 relation_type: value_desc.typ(),
3978 allow_aggregates: false,
3979 allow_subqueries: false,
3980 allow_parameters: false,
3981 allow_windows: false,
3982 };
3983 let expr = plan_expr(ecx, partition_by)?.cast_to(
3984 ecx,
3985 CastContext::Assignment,
3986 &SqlScalarType::UInt64,
3987 )?;
3988 let expr = expr.lower_uncorrelated()?;
3989
3990 Some(expr)
3991 }
3992 _ => None,
3993 };
3994
3995 let format = match format {
3997 Some(FormatSpecifier::KeyValue { key, value }) => {
3998 let key_format = match key_desc_and_indices.as_ref() {
3999 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4000 None => None,
4001 };
4002 KafkaSinkFormat {
4003 value_format: map_format(value, &value_desc, false)?,
4004 key_format,
4005 }
4006 }
4007 Some(FormatSpecifier::Bare(format)) => {
4008 let key_format = match key_desc_and_indices.as_ref() {
4009 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4010 None => None,
4011 };
4012 KafkaSinkFormat {
4013 value_format: map_format(format, &value_desc, false)?,
4014 key_format,
4015 }
4016 }
4017 None => bail_unsupported!("sink without format"),
4018 };
4019
4020 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4021 connection_id,
4022 connection: connection_id,
4023 format,
4024 topic: topic_name,
4025 relation_key_indices,
4026 key_desc_and_indices,
4027 headers_index,
4028 value_desc,
4029 partition_by,
4030 compression_type,
4031 progress_group_id,
4032 transactional_id,
4033 topic_options: KafkaTopicOptions {
4034 partition_count: topic_partition_count,
4035 replication_factor: topic_replication_factor,
4036 topic_config: topic_config.unwrap_or_default(),
4037 },
4038 topic_metadata_refresh_interval,
4039 }))
4040}
4041
4042pub fn describe_create_index(
4043 _: &StatementContext,
4044 _: CreateIndexStatement<Aug>,
4045) -> Result<StatementDesc, PlanError> {
4046 Ok(StatementDesc::new(None))
4047}
4048
4049pub fn plan_create_index(
4050 scx: &StatementContext,
4051 mut stmt: CreateIndexStatement<Aug>,
4052) -> Result<Plan, PlanError> {
4053 let CreateIndexStatement {
4054 name,
4055 on_name,
4056 in_cluster,
4057 key_parts,
4058 with_options,
4059 if_not_exists,
4060 } = &mut stmt;
4061 let on = scx.get_item_by_resolved_name(on_name)?;
4062 let on_item_type = on.item_type();
4063
4064 if !matches!(
4065 on_item_type,
4066 CatalogItemType::View
4067 | CatalogItemType::MaterializedView
4068 | CatalogItemType::Source
4069 | CatalogItemType::Table
4070 ) {
4071 sql_bail!(
4072 "index cannot be created on {} because it is a {}",
4073 on_name.full_name_str(),
4074 on.item_type()
4075 )
4076 }
4077
4078 let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
4079
4080 let filled_key_parts = match key_parts {
4081 Some(kp) => kp.to_vec(),
4082 None => {
4083 on.desc(&scx.catalog.resolve_full_name(on.name()))?
4085 .typ()
4086 .default_key()
4087 .iter()
4088 .map(|i| match on_desc.get_unambiguous_name(*i) {
4089 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4090 _ => Expr::Value(Value::Number((i + 1).to_string())),
4091 })
4092 .collect()
4093 }
4094 };
4095 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4096
4097 let index_name = if let Some(name) = name {
4098 QualifiedItemName {
4099 qualifiers: on.name().qualifiers.clone(),
4100 item: normalize::ident(name.clone()),
4101 }
4102 } else {
4103 let mut idx_name = QualifiedItemName {
4104 qualifiers: on.name().qualifiers.clone(),
4105 item: on.name().item.clone(),
4106 };
4107 if key_parts.is_none() {
4108 idx_name.item += "_primary_idx";
4110 } else {
4111 let index_name_col_suffix = keys
4114 .iter()
4115 .map(|k| match k {
4116 mz_expr::MirScalarExpr::Column(i, name) => {
4117 match (on_desc.get_unambiguous_name(*i), &name.0) {
4118 (Some(col_name), _) => col_name.to_string(),
4119 (None, Some(name)) => name.to_string(),
4120 (None, None) => format!("{}", i + 1),
4121 }
4122 }
4123 _ => "expr".to_string(),
4124 })
4125 .join("_");
4126 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4127 .expect("write on strings cannot fail");
4128 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4129 }
4130
4131 if !*if_not_exists {
4132 scx.catalog.find_available_name(idx_name)
4133 } else {
4134 idx_name
4135 }
4136 };
4137
4138 let full_name = scx.catalog.resolve_full_name(&index_name);
4140 let partial_name = PartialItemName::from(full_name.clone());
4141 if let (Ok(item), false, false) = (
4149 scx.catalog.resolve_item_or_type(&partial_name),
4150 *if_not_exists,
4151 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4152 ) {
4153 return Err(PlanError::ItemAlreadyExists {
4154 name: full_name.to_string(),
4155 item_type: item.item_type(),
4156 });
4157 }
4158
4159 let options = plan_index_options(scx, with_options.clone())?;
4160 let cluster_id = match in_cluster {
4161 None => scx.resolve_cluster(None)?.id(),
4162 Some(in_cluster) => in_cluster.id,
4163 };
4164
4165 *in_cluster = Some(ResolvedClusterName {
4166 id: cluster_id,
4167 print_name: None,
4168 });
4169
4170 *name = Some(Ident::new(index_name.item.clone())?);
4172 *key_parts = Some(filled_key_parts);
4173 let if_not_exists = *if_not_exists;
4174
4175 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4176 let compaction_window = options.iter().find_map(|o| {
4177 #[allow(irrefutable_let_patterns)]
4178 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4179 Some(lcw.clone())
4180 } else {
4181 None
4182 }
4183 });
4184
4185 Ok(Plan::CreateIndex(CreateIndexPlan {
4186 name: index_name,
4187 index: Index {
4188 create_sql,
4189 on: on.global_id(),
4190 keys,
4191 cluster_id,
4192 compaction_window,
4193 },
4194 if_not_exists,
4195 }))
4196}
4197
4198pub fn describe_create_type(
4199 _: &StatementContext,
4200 _: CreateTypeStatement<Aug>,
4201) -> Result<StatementDesc, PlanError> {
4202 Ok(StatementDesc::new(None))
4203}
4204
4205pub fn plan_create_type(
4206 scx: &StatementContext,
4207 stmt: CreateTypeStatement<Aug>,
4208) -> Result<Plan, PlanError> {
4209 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4210 let CreateTypeStatement { name, as_type, .. } = stmt;
4211
4212 fn validate_data_type(
4213 scx: &StatementContext,
4214 data_type: ResolvedDataType,
4215 as_type: &str,
4216 key: &str,
4217 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4218 let (id, modifiers) = match data_type {
4219 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4220 _ => sql_bail!(
4221 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4222 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4223 as_type,
4224 key,
4225 data_type.human_readable_name(),
4226 ),
4227 };
4228
4229 let item = scx.catalog.get_item(&id);
4230 match item.type_details() {
4231 None => sql_bail!(
4232 "{} must be of class type, but received {} which is of class {}",
4233 key,
4234 scx.catalog.resolve_full_name(item.name()),
4235 item.item_type()
4236 ),
4237 Some(CatalogTypeDetails {
4238 typ: CatalogType::Char,
4239 ..
4240 }) => {
4241 bail_unsupported!("embedding char type in a list or map")
4242 }
4243 _ => {
4244 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4246
4247 Ok((id, modifiers))
4248 }
4249 }
4250 }
4251
4252 let inner = match as_type {
4253 CreateTypeAs::List { options } => {
4254 let CreateTypeListOptionExtracted {
4255 element_type,
4256 seen: _,
4257 } = CreateTypeListOptionExtracted::try_from(options)?;
4258 let element_type =
4259 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4260 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4261 CatalogType::List {
4262 element_reference: id,
4263 element_modifiers: modifiers,
4264 }
4265 }
4266 CreateTypeAs::Map { options } => {
4267 let CreateTypeMapOptionExtracted {
4268 key_type,
4269 value_type,
4270 seen: _,
4271 } = CreateTypeMapOptionExtracted::try_from(options)?;
4272 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4273 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4274 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4275 let (value_id, value_modifiers) =
4276 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4277 CatalogType::Map {
4278 key_reference: key_id,
4279 key_modifiers,
4280 value_reference: value_id,
4281 value_modifiers,
4282 }
4283 }
4284 CreateTypeAs::Record { column_defs } => {
4285 let mut fields = vec![];
4286 for column_def in column_defs {
4287 let data_type = column_def.data_type;
4288 let key = ident(column_def.name.clone());
4289 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4290 fields.push(CatalogRecordField {
4291 name: ColumnName::from(key.clone()),
4292 type_reference: id,
4293 type_modifiers: modifiers,
4294 });
4295 }
4296 CatalogType::Record { fields }
4297 }
4298 };
4299
4300 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4301
4302 let full_name = scx.catalog.resolve_full_name(&name);
4304 let partial_name = PartialItemName::from(full_name.clone());
4305 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4308 if item.item_type().conflicts_with_type() {
4309 return Err(PlanError::ItemAlreadyExists {
4310 name: full_name.to_string(),
4311 item_type: item.item_type(),
4312 });
4313 }
4314 }
4315
4316 Ok(Plan::CreateType(CreateTypePlan {
4317 name,
4318 typ: Type { create_sql, inner },
4319 }))
4320}
4321
4322generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4323
4324generate_extracted_config!(
4325 CreateTypeMapOption,
4326 (KeyType, ResolvedDataType),
4327 (ValueType, ResolvedDataType)
4328);
4329
4330#[derive(Debug)]
4331pub enum PlannedAlterRoleOption {
4332 Attributes(PlannedRoleAttributes),
4333 Variable(PlannedRoleVariable),
4334}
4335
4336#[derive(Debug, Clone)]
4337pub struct PlannedRoleAttributes {
4338 pub inherit: Option<bool>,
4339 pub password: Option<Password>,
4340 pub nopassword: Option<bool>,
4344 pub superuser: Option<bool>,
4345 pub login: Option<bool>,
4346}
4347
4348fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4349 let mut planned_attributes = PlannedRoleAttributes {
4350 inherit: None,
4351 password: None,
4352 superuser: None,
4353 login: None,
4354 nopassword: None,
4355 };
4356
4357 for option in options {
4358 match option {
4359 RoleAttribute::Inherit | RoleAttribute::NoInherit
4360 if planned_attributes.inherit.is_some() =>
4361 {
4362 sql_bail!("conflicting or redundant options");
4363 }
4364 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4365 bail_never_supported!(
4366 "CREATECLUSTER attribute",
4367 "sql/create-role/#details",
4368 "Use system privileges instead."
4369 );
4370 }
4371 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4372 bail_never_supported!(
4373 "CREATEDB attribute",
4374 "sql/create-role/#details",
4375 "Use system privileges instead."
4376 );
4377 }
4378 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4379 bail_never_supported!(
4380 "CREATEROLE attribute",
4381 "sql/create-role/#details",
4382 "Use system privileges instead."
4383 );
4384 }
4385 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4386 sql_bail!("conflicting or redundant options");
4387 }
4388
4389 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4390 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4391 RoleAttribute::Password(password) => {
4392 if let Some(password) = password {
4393 planned_attributes.password = Some(password.into());
4394 } else {
4395 planned_attributes.nopassword = Some(true);
4396 }
4397 }
4398 RoleAttribute::SuperUser => {
4399 if planned_attributes.superuser == Some(false) {
4400 sql_bail!("conflicting or redundant options");
4401 }
4402 planned_attributes.superuser = Some(true);
4403 }
4404 RoleAttribute::NoSuperUser => {
4405 if planned_attributes.superuser == Some(true) {
4406 sql_bail!("conflicting or redundant options");
4407 }
4408 planned_attributes.superuser = Some(false);
4409 }
4410 RoleAttribute::Login => {
4411 if planned_attributes.login == Some(false) {
4412 sql_bail!("conflicting or redundant options");
4413 }
4414 planned_attributes.login = Some(true);
4415 }
4416 RoleAttribute::NoLogin => {
4417 if planned_attributes.login == Some(true) {
4418 sql_bail!("conflicting or redundant options");
4419 }
4420 planned_attributes.login = Some(false);
4421 }
4422 }
4423 }
4424 if planned_attributes.inherit == Some(false) {
4425 bail_unsupported!("non inherit roles");
4426 }
4427
4428 Ok(planned_attributes)
4429}
4430
4431#[derive(Debug)]
4432pub enum PlannedRoleVariable {
4433 Set { name: String, value: VariableValue },
4434 Reset { name: String },
4435}
4436
4437impl PlannedRoleVariable {
4438 pub fn name(&self) -> &str {
4439 match self {
4440 PlannedRoleVariable::Set { name, .. } => name,
4441 PlannedRoleVariable::Reset { name } => name,
4442 }
4443 }
4444}
4445
4446fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4447 let plan = match variable {
4448 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4449 name: name.to_string(),
4450 value: scl::plan_set_variable_to(value)?,
4451 },
4452 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4453 name: name.to_string(),
4454 },
4455 };
4456 Ok(plan)
4457}
4458
4459pub fn describe_create_role(
4460 _: &StatementContext,
4461 _: CreateRoleStatement,
4462) -> Result<StatementDesc, PlanError> {
4463 Ok(StatementDesc::new(None))
4464}
4465
4466pub fn plan_create_role(
4467 _: &StatementContext,
4468 CreateRoleStatement { name, options }: CreateRoleStatement,
4469) -> Result<Plan, PlanError> {
4470 let attributes = plan_role_attributes(options)?;
4471 Ok(Plan::CreateRole(CreateRolePlan {
4472 name: normalize::ident(name),
4473 attributes: attributes.into(),
4474 }))
4475}
4476
4477pub fn plan_create_network_policy(
4478 ctx: &StatementContext,
4479 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4480) -> Result<Plan, PlanError> {
4481 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4482 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4483
4484 let Some(rule_defs) = policy_options.rules else {
4485 sql_bail!("RULES must be specified when creating network policies.");
4486 };
4487
4488 let mut rules = vec![];
4489 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4490 let NetworkPolicyRuleOptionExtracted {
4491 seen: _,
4492 direction,
4493 action,
4494 address,
4495 } = options.try_into()?;
4496 let (direction, action, address) = match (direction, action, address) {
4497 (Some(direction), Some(action), Some(address)) => (
4498 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4499 NetworkPolicyRuleAction::try_from(action.as_str())?,
4500 PolicyAddress::try_from(address.as_str())?,
4501 ),
4502 (_, _, _) => {
4503 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4504 }
4505 };
4506 rules.push(NetworkPolicyRule {
4507 name: normalize::ident(name),
4508 direction,
4509 action,
4510 address,
4511 });
4512 }
4513
4514 if rules.len()
4515 > ctx
4516 .catalog
4517 .system_vars()
4518 .max_rules_per_network_policy()
4519 .try_into()?
4520 {
4521 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4522 }
4523
4524 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4525 name: normalize::ident(name),
4526 rules,
4527 }))
4528}
4529
4530pub fn plan_alter_network_policy(
4531 ctx: &StatementContext,
4532 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4533) -> Result<Plan, PlanError> {
4534 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4535
4536 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4537 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4538
4539 let Some(rule_defs) = policy_options.rules else {
4540 sql_bail!("RULES must be specified when creating network policies.");
4541 };
4542
4543 let mut rules = vec![];
4544 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4545 let NetworkPolicyRuleOptionExtracted {
4546 seen: _,
4547 direction,
4548 action,
4549 address,
4550 } = options.try_into()?;
4551
4552 let (direction, action, address) = match (direction, action, address) {
4553 (Some(direction), Some(action), Some(address)) => (
4554 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4555 NetworkPolicyRuleAction::try_from(action.as_str())?,
4556 PolicyAddress::try_from(address.as_str())?,
4557 ),
4558 (_, _, _) => {
4559 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4560 }
4561 };
4562 rules.push(NetworkPolicyRule {
4563 name: normalize::ident(name),
4564 direction,
4565 action,
4566 address,
4567 });
4568 }
4569 if rules.len()
4570 > ctx
4571 .catalog
4572 .system_vars()
4573 .max_rules_per_network_policy()
4574 .try_into()?
4575 {
4576 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4577 }
4578
4579 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4580 id: policy.id(),
4581 name: normalize::ident(name),
4582 rules,
4583 }))
4584}
4585
4586pub fn describe_create_cluster(
4587 _: &StatementContext,
4588 _: CreateClusterStatement<Aug>,
4589) -> Result<StatementDesc, PlanError> {
4590 Ok(StatementDesc::new(None))
4591}
4592
4593generate_extracted_config!(
4599 ClusterOption,
4600 (AvailabilityZones, Vec<String>),
4601 (Disk, bool),
4602 (IntrospectionDebugging, bool),
4603 (IntrospectionInterval, OptionalDuration),
4604 (Managed, bool),
4605 (Replicas, Vec<ReplicaDefinition<Aug>>),
4606 (ReplicationFactor, u32),
4607 (Size, String),
4608 (Schedule, ClusterScheduleOptionValue),
4609 (WorkloadClass, OptionalString)
4610);
4611
4612generate_extracted_config!(
4613 NetworkPolicyOption,
4614 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4615);
4616
4617generate_extracted_config!(
4618 NetworkPolicyRuleOption,
4619 (Direction, String),
4620 (Action, String),
4621 (Address, String)
4622);
4623
4624generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4625
4626generate_extracted_config!(
4627 ClusterAlterUntilReadyOption,
4628 (Timeout, Duration),
4629 (OnTimeout, String)
4630);
4631
4632generate_extracted_config!(
4633 ClusterFeature,
4634 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4635 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4636 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4637 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4638 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4639 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4640 (
4641 EnableProjectionPushdownAfterRelationCse,
4642 Option<bool>,
4643 Default(None)
4644 )
4645);
4646
4647pub fn plan_create_cluster(
4651 scx: &StatementContext,
4652 stmt: CreateClusterStatement<Aug>,
4653) -> Result<Plan, PlanError> {
4654 let plan = plan_create_cluster_inner(scx, stmt)?;
4655
4656 if let CreateClusterVariant::Managed(_) = &plan.variant {
4658 let stmt = unplan_create_cluster(scx, plan.clone())
4659 .map_err(|e| PlanError::Replan(e.to_string()))?;
4660 let create_sql = stmt.to_ast_string_stable();
4661 let stmt = parse::parse(&create_sql)
4662 .map_err(|e| PlanError::Replan(e.to_string()))?
4663 .into_element()
4664 .ast;
4665 let (stmt, _resolved_ids) =
4666 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4667 let stmt = match stmt {
4668 Statement::CreateCluster(stmt) => stmt,
4669 stmt => {
4670 return Err(PlanError::Replan(format!(
4671 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4672 )));
4673 }
4674 };
4675 let replan =
4676 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4677 if plan != replan {
4678 return Err(PlanError::Replan(format!(
4679 "replan does not match: plan={plan:?}, replan={replan:?}"
4680 )));
4681 }
4682 }
4683
4684 Ok(Plan::CreateCluster(plan))
4685}
4686
4687pub fn plan_create_cluster_inner(
4688 scx: &StatementContext,
4689 CreateClusterStatement {
4690 name,
4691 options,
4692 features,
4693 }: CreateClusterStatement<Aug>,
4694) -> Result<CreateClusterPlan, PlanError> {
4695 let ClusterOptionExtracted {
4696 availability_zones,
4697 introspection_debugging,
4698 introspection_interval,
4699 managed,
4700 replicas,
4701 replication_factor,
4702 seen: _,
4703 size,
4704 disk,
4705 schedule,
4706 workload_class,
4707 }: ClusterOptionExtracted = options.try_into()?;
4708
4709 let managed = managed.unwrap_or_else(|| replicas.is_none());
4710
4711 if !scx.catalog.active_role_id().is_system() {
4712 if !features.is_empty() {
4713 sql_bail!("FEATURES not supported for non-system users");
4714 }
4715 if workload_class.is_some() {
4716 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4717 }
4718 }
4719
4720 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4721 let workload_class = workload_class.and_then(|v| v.0);
4722
4723 if managed {
4724 if replicas.is_some() {
4725 sql_bail!("REPLICAS not supported for managed clusters");
4726 }
4727 let Some(size) = size else {
4728 sql_bail!("SIZE must be specified for managed clusters");
4729 };
4730
4731 if disk.is_some() {
4732 if scx.catalog.is_cluster_size_cc(&size) {
4736 sql_bail!(
4737 "DISK option not supported for modern cluster sizes because disk is always enabled"
4738 );
4739 }
4740
4741 scx.catalog
4742 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4743 }
4744
4745 let compute = plan_compute_replica_config(
4746 introspection_interval,
4747 introspection_debugging.unwrap_or(false),
4748 )?;
4749
4750 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4751 replication_factor.unwrap_or_else(|| {
4752 scx.catalog
4753 .system_vars()
4754 .default_cluster_replication_factor()
4755 })
4756 } else {
4757 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4758 if replication_factor.is_some() {
4759 sql_bail!(
4760 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4761 );
4762 }
4763 0
4767 };
4768 let availability_zones = availability_zones.unwrap_or_default();
4769
4770 if !availability_zones.is_empty() {
4771 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4772 }
4773
4774 let ClusterFeatureExtracted {
4776 reoptimize_imported_views,
4777 enable_eager_delta_joins,
4778 enable_new_outer_join_lowering,
4779 enable_variadic_left_join_lowering,
4780 enable_letrec_fixpoint_analysis,
4781 enable_join_prioritize_arranged,
4782 enable_projection_pushdown_after_relation_cse,
4783 seen: _,
4784 } = ClusterFeatureExtracted::try_from(features)?;
4785 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4786 reoptimize_imported_views,
4787 enable_eager_delta_joins,
4788 enable_new_outer_join_lowering,
4789 enable_variadic_left_join_lowering,
4790 enable_letrec_fixpoint_analysis,
4791 enable_join_prioritize_arranged,
4792 enable_projection_pushdown_after_relation_cse,
4793 ..Default::default()
4794 };
4795
4796 let schedule = plan_cluster_schedule(schedule)?;
4797
4798 Ok(CreateClusterPlan {
4799 name: normalize::ident(name),
4800 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4801 replication_factor,
4802 size,
4803 availability_zones,
4804 compute,
4805 optimizer_feature_overrides,
4806 schedule,
4807 }),
4808 workload_class,
4809 })
4810 } else {
4811 let Some(replica_defs) = replicas else {
4812 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4813 };
4814 if availability_zones.is_some() {
4815 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4816 }
4817 if replication_factor.is_some() {
4818 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4819 }
4820 if introspection_debugging.is_some() {
4821 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4822 }
4823 if introspection_interval.is_some() {
4824 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4825 }
4826 if size.is_some() {
4827 sql_bail!("SIZE not supported for unmanaged clusters");
4828 }
4829 if disk.is_some() {
4830 sql_bail!("DISK not supported for unmanaged clusters");
4831 }
4832 if !features.is_empty() {
4833 sql_bail!("FEATURES not supported for unmanaged clusters");
4834 }
4835 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4836 sql_bail!(
4837 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4838 );
4839 }
4840
4841 let mut replicas = vec![];
4842 for ReplicaDefinition { name, options } in replica_defs {
4843 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4844 }
4845
4846 Ok(CreateClusterPlan {
4847 name: normalize::ident(name),
4848 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4849 workload_class,
4850 })
4851 }
4852}
4853
4854pub fn unplan_create_cluster(
4858 scx: &StatementContext,
4859 CreateClusterPlan {
4860 name,
4861 variant,
4862 workload_class,
4863 }: CreateClusterPlan,
4864) -> Result<CreateClusterStatement<Aug>, PlanError> {
4865 match variant {
4866 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4867 replication_factor,
4868 size,
4869 availability_zones,
4870 compute,
4871 optimizer_feature_overrides,
4872 schedule,
4873 }) => {
4874 let schedule = unplan_cluster_schedule(schedule);
4875 let OptimizerFeatureOverrides {
4876 enable_guard_subquery_tablefunc: _,
4877 enable_consolidate_after_union_negate: _,
4878 enable_reduce_mfp_fusion: _,
4879 enable_cardinality_estimates: _,
4880 persist_fast_path_limit: _,
4881 reoptimize_imported_views,
4882 enable_eager_delta_joins,
4883 enable_new_outer_join_lowering,
4884 enable_variadic_left_join_lowering,
4885 enable_letrec_fixpoint_analysis,
4886 enable_reduce_reduction: _,
4887 enable_join_prioritize_arranged,
4888 enable_projection_pushdown_after_relation_cse,
4889 enable_less_reduce_in_eqprop: _,
4890 enable_dequadratic_eqprop_map: _,
4891 enable_eq_classes_withholding_errors: _,
4892 enable_fast_path_plan_insights: _,
4893 } = optimizer_feature_overrides;
4894 let features_extracted = ClusterFeatureExtracted {
4896 seen: Default::default(),
4898 reoptimize_imported_views,
4899 enable_eager_delta_joins,
4900 enable_new_outer_join_lowering,
4901 enable_variadic_left_join_lowering,
4902 enable_letrec_fixpoint_analysis,
4903 enable_join_prioritize_arranged,
4904 enable_projection_pushdown_after_relation_cse,
4905 };
4906 let features = features_extracted.into_values(scx.catalog);
4907 let availability_zones = if availability_zones.is_empty() {
4908 None
4909 } else {
4910 Some(availability_zones)
4911 };
4912 let (introspection_interval, introspection_debugging) =
4913 unplan_compute_replica_config(compute);
4914 let replication_factor = match &schedule {
4917 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4918 ClusterScheduleOptionValue::Refresh { .. } => {
4919 assert!(
4920 replication_factor <= 1,
4921 "replication factor, {replication_factor:?}, must be <= 1"
4922 );
4923 None
4924 }
4925 };
4926 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4927 let options_extracted = ClusterOptionExtracted {
4928 seen: Default::default(),
4930 availability_zones,
4931 disk: None,
4932 introspection_debugging: Some(introspection_debugging),
4933 introspection_interval,
4934 managed: Some(true),
4935 replicas: None,
4936 replication_factor,
4937 size: Some(size),
4938 schedule: Some(schedule),
4939 workload_class,
4940 };
4941 let options = options_extracted.into_values(scx.catalog);
4942 let name = Ident::new_unchecked(name);
4943 Ok(CreateClusterStatement {
4944 name,
4945 options,
4946 features,
4947 })
4948 }
4949 CreateClusterVariant::Unmanaged(_) => {
4950 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4951 }
4952 }
4953}
4954
4955generate_extracted_config!(
4956 ReplicaOption,
4957 (AvailabilityZone, String),
4958 (BilledAs, String),
4959 (ComputeAddresses, Vec<String>),
4960 (ComputectlAddresses, Vec<String>),
4961 (Disk, bool),
4962 (Internal, bool, Default(false)),
4963 (IntrospectionDebugging, bool, Default(false)),
4964 (IntrospectionInterval, OptionalDuration),
4965 (Size, String),
4966 (StorageAddresses, Vec<String>),
4967 (StoragectlAddresses, Vec<String>),
4968 (Workers, u16)
4969);
4970
4971fn plan_replica_config(
4972 scx: &StatementContext,
4973 options: Vec<ReplicaOption<Aug>>,
4974) -> Result<ReplicaConfig, PlanError> {
4975 let ReplicaOptionExtracted {
4976 availability_zone,
4977 billed_as,
4978 computectl_addresses,
4979 disk,
4980 internal,
4981 introspection_debugging,
4982 introspection_interval,
4983 size,
4984 storagectl_addresses,
4985 ..
4986 }: ReplicaOptionExtracted = options.try_into()?;
4987
4988 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4989
4990 match (
4991 size,
4992 availability_zone,
4993 billed_as,
4994 storagectl_addresses,
4995 computectl_addresses,
4996 ) {
4997 (None, _, None, None, None) => {
4999 sql_bail!("SIZE option must be specified");
5002 }
5003 (Some(size), availability_zone, billed_as, None, None) => {
5004 if disk.is_some() {
5005 if scx.catalog.is_cluster_size_cc(&size) {
5009 sql_bail!(
5010 "DISK option not supported for modern cluster sizes because disk is always enabled"
5011 );
5012 }
5013
5014 scx.catalog
5015 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5016 }
5017
5018 Ok(ReplicaConfig::Orchestrated {
5019 size,
5020 availability_zone,
5021 compute,
5022 billed_as,
5023 internal,
5024 })
5025 }
5026
5027 (None, None, None, storagectl_addresses, computectl_addresses) => {
5028 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5029
5030 let Some(storagectl_addrs) = storagectl_addresses else {
5034 sql_bail!("missing STORAGECTL ADDRESSES option");
5035 };
5036 let Some(computectl_addrs) = computectl_addresses else {
5037 sql_bail!("missing COMPUTECTL ADDRESSES option");
5038 };
5039
5040 if storagectl_addrs.len() != computectl_addrs.len() {
5041 sql_bail!(
5042 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5043 );
5044 }
5045
5046 if disk.is_some() {
5047 sql_bail!("DISK can't be specified for unorchestrated clusters");
5048 }
5049
5050 Ok(ReplicaConfig::Unorchestrated {
5051 storagectl_addrs,
5052 computectl_addrs,
5053 compute,
5054 })
5055 }
5056 _ => {
5057 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5060 }
5061 }
5062}
5063
5064fn plan_compute_replica_config(
5068 introspection_interval: Option<OptionalDuration>,
5069 introspection_debugging: bool,
5070) -> Result<ComputeReplicaConfig, PlanError> {
5071 let introspection_interval = introspection_interval
5072 .map(|OptionalDuration(i)| i)
5073 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5074 let introspection = match introspection_interval {
5075 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5076 interval,
5077 debugging: introspection_debugging,
5078 }),
5079 None if introspection_debugging => {
5080 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5081 }
5082 None => None,
5083 };
5084 let compute = ComputeReplicaConfig { introspection };
5085 Ok(compute)
5086}
5087
5088fn unplan_compute_replica_config(
5092 compute_replica_config: ComputeReplicaConfig,
5093) -> (Option<OptionalDuration>, bool) {
5094 match compute_replica_config.introspection {
5095 Some(ComputeReplicaIntrospectionConfig {
5096 debugging,
5097 interval,
5098 }) => (Some(OptionalDuration(Some(interval))), debugging),
5099 None => (Some(OptionalDuration(None)), false),
5100 }
5101}
5102
5103fn plan_cluster_schedule(
5107 schedule: ClusterScheduleOptionValue,
5108) -> Result<ClusterSchedule, PlanError> {
5109 Ok(match schedule {
5110 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5111 ClusterScheduleOptionValue::Refresh {
5113 hydration_time_estimate: None,
5114 } => ClusterSchedule::Refresh {
5115 hydration_time_estimate: Duration::from_millis(0),
5116 },
5117 ClusterScheduleOptionValue::Refresh {
5119 hydration_time_estimate: Some(interval_value),
5120 } => {
5121 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5122 if interval.as_microseconds() < 0 {
5123 sql_bail!(
5124 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5125 interval
5126 );
5127 }
5128 if interval.months != 0 {
5129 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5133 }
5134 let duration = interval.duration()?;
5135 if u64::try_from(duration.as_millis()).is_err()
5136 || Interval::from_duration(&duration).is_err()
5137 {
5138 sql_bail!("HYDRATION TIME ESTIMATE too large");
5139 }
5140 ClusterSchedule::Refresh {
5141 hydration_time_estimate: duration,
5142 }
5143 }
5144 })
5145}
5146
5147fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5151 match schedule {
5152 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5153 ClusterSchedule::Refresh {
5154 hydration_time_estimate,
5155 } => {
5156 let interval = Interval::from_duration(&hydration_time_estimate)
5157 .expect("planning ensured that this is convertible back to Interval");
5158 let interval_value = literal::unplan_interval(&interval);
5159 ClusterScheduleOptionValue::Refresh {
5160 hydration_time_estimate: Some(interval_value),
5161 }
5162 }
5163 }
5164}
5165
5166pub fn describe_create_cluster_replica(
5167 _: &StatementContext,
5168 _: CreateClusterReplicaStatement<Aug>,
5169) -> Result<StatementDesc, PlanError> {
5170 Ok(StatementDesc::new(None))
5171}
5172
5173pub fn plan_create_cluster_replica(
5174 scx: &StatementContext,
5175 CreateClusterReplicaStatement {
5176 definition: ReplicaDefinition { name, options },
5177 of_cluster,
5178 }: CreateClusterReplicaStatement<Aug>,
5179) -> Result<Plan, PlanError> {
5180 let cluster = scx
5181 .catalog
5182 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5183 let current_replica_count = cluster.replica_ids().iter().count();
5184 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5185 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5186 return Err(PlanError::CreateReplicaFailStorageObjects {
5187 current_replica_count,
5188 internal_replica_count,
5189 hypothetical_replica_count: current_replica_count + 1,
5190 });
5191 }
5192
5193 let config = plan_replica_config(scx, options)?;
5194
5195 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5196 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5197 return Err(PlanError::MangedReplicaName(name.into_string()));
5198 }
5199 } else {
5200 ensure_cluster_is_not_managed(scx, cluster.id())?;
5201 }
5202
5203 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5204 name: normalize::ident(name),
5205 cluster_id: cluster.id(),
5206 config,
5207 }))
5208}
5209
5210pub fn describe_create_secret(
5211 _: &StatementContext,
5212 _: CreateSecretStatement<Aug>,
5213) -> Result<StatementDesc, PlanError> {
5214 Ok(StatementDesc::new(None))
5215}
5216
5217pub fn plan_create_secret(
5218 scx: &StatementContext,
5219 stmt: CreateSecretStatement<Aug>,
5220) -> Result<Plan, PlanError> {
5221 let CreateSecretStatement {
5222 name,
5223 if_not_exists,
5224 value,
5225 } = &stmt;
5226
5227 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5228 let mut create_sql_statement = stmt.clone();
5229 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5230 let create_sql =
5231 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5232 let secret_as = query::plan_secret_as(scx, value.clone())?;
5233
5234 let secret = Secret {
5235 create_sql,
5236 secret_as,
5237 };
5238
5239 Ok(Plan::CreateSecret(CreateSecretPlan {
5240 name,
5241 secret,
5242 if_not_exists: *if_not_exists,
5243 }))
5244}
5245
5246pub fn describe_create_connection(
5247 _: &StatementContext,
5248 _: CreateConnectionStatement<Aug>,
5249) -> Result<StatementDesc, PlanError> {
5250 Ok(StatementDesc::new(None))
5251}
5252
5253generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5254
5255pub fn plan_create_connection(
5256 scx: &StatementContext,
5257 mut stmt: CreateConnectionStatement<Aug>,
5258) -> Result<Plan, PlanError> {
5259 let CreateConnectionStatement {
5260 name,
5261 connection_type,
5262 values,
5263 if_not_exists,
5264 with_options,
5265 } = stmt.clone();
5266 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5267 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5268 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5269
5270 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5271 if options.validate.is_some() {
5272 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5273 }
5274 let validate = match options.validate {
5275 Some(val) => val,
5276 None => {
5277 scx.catalog
5278 .system_vars()
5279 .enable_default_connection_validation()
5280 && details.to_connection().validate_by_default()
5281 }
5282 };
5283
5284 let full_name = scx.catalog.resolve_full_name(&name);
5286 let partial_name = PartialItemName::from(full_name.clone());
5287 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5288 return Err(PlanError::ItemAlreadyExists {
5289 name: full_name.to_string(),
5290 item_type: item.item_type(),
5291 });
5292 }
5293
5294 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5297 stmt.values.retain(|v| {
5298 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5299 });
5300 stmt.values.push(ConnectionOption {
5301 name: ConnectionOptionName::PublicKey1,
5302 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5303 });
5304 stmt.values.push(ConnectionOption {
5305 name: ConnectionOptionName::PublicKey2,
5306 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5307 });
5308 }
5309 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5310
5311 let plan = CreateConnectionPlan {
5312 name,
5313 if_not_exists,
5314 connection: crate::plan::Connection {
5315 create_sql,
5316 details,
5317 },
5318 validate,
5319 };
5320 Ok(Plan::CreateConnection(plan))
5321}
5322
5323fn plan_drop_database(
5324 scx: &StatementContext,
5325 if_exists: bool,
5326 name: &UnresolvedDatabaseName,
5327 cascade: bool,
5328) -> Result<Option<DatabaseId>, PlanError> {
5329 Ok(match resolve_database(scx, name, if_exists)? {
5330 Some(database) => {
5331 if !cascade && database.has_schemas() {
5332 sql_bail!(
5333 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5334 name,
5335 );
5336 }
5337 Some(database.id())
5338 }
5339 None => None,
5340 })
5341}
5342
5343pub fn describe_drop_objects(
5344 _: &StatementContext,
5345 _: DropObjectsStatement,
5346) -> Result<StatementDesc, PlanError> {
5347 Ok(StatementDesc::new(None))
5348}
5349
5350pub fn plan_drop_objects(
5351 scx: &mut StatementContext,
5352 DropObjectsStatement {
5353 object_type,
5354 if_exists,
5355 names,
5356 cascade,
5357 }: DropObjectsStatement,
5358) -> Result<Plan, PlanError> {
5359 assert_ne!(
5360 object_type,
5361 mz_sql_parser::ast::ObjectType::Func,
5362 "rejected in parser"
5363 );
5364 let object_type = object_type.into();
5365
5366 let mut referenced_ids = Vec::new();
5367 for name in names {
5368 let id = match &name {
5369 UnresolvedObjectName::Cluster(name) => {
5370 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5371 }
5372 UnresolvedObjectName::ClusterReplica(name) => {
5373 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5374 }
5375 UnresolvedObjectName::Database(name) => {
5376 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5377 }
5378 UnresolvedObjectName::Schema(name) => {
5379 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5380 }
5381 UnresolvedObjectName::Role(name) => {
5382 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5383 }
5384 UnresolvedObjectName::Item(name) => {
5385 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5386 .map(ObjectId::Item)
5387 }
5388 UnresolvedObjectName::NetworkPolicy(name) => {
5389 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5390 }
5391 };
5392 match id {
5393 Some(id) => referenced_ids.push(id),
5394 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5395 name: name.to_ast_string_simple(),
5396 object_type,
5397 }),
5398 }
5399 }
5400 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5401
5402 Ok(Plan::DropObjects(DropObjectsPlan {
5403 referenced_ids,
5404 drop_ids,
5405 object_type,
5406 }))
5407}
5408
5409fn plan_drop_schema(
5410 scx: &StatementContext,
5411 if_exists: bool,
5412 name: &UnresolvedSchemaName,
5413 cascade: bool,
5414) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5415 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5416 Some((database_spec, schema_spec)) => {
5417 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5418 sql_bail!(
5419 "cannot drop schema {name} because it is required by the database system",
5420 );
5421 }
5422 if let SchemaSpecifier::Temporary = schema_spec {
5423 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5424 }
5425 let schema = scx.get_schema(&database_spec, &schema_spec);
5426 if !cascade && schema.has_items() {
5427 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5428 sql_bail!(
5429 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5430 full_schema_name
5431 );
5432 }
5433 Some((database_spec, schema_spec))
5434 }
5435 None => None,
5436 })
5437}
5438
5439fn plan_drop_role(
5440 scx: &StatementContext,
5441 if_exists: bool,
5442 name: &Ident,
5443) -> Result<Option<RoleId>, PlanError> {
5444 match scx.catalog.resolve_role(name.as_str()) {
5445 Ok(role) => {
5446 let id = role.id();
5447 if &id == scx.catalog.active_role_id() {
5448 sql_bail!("current role cannot be dropped");
5449 }
5450 for role in scx.catalog.get_roles() {
5451 for (member_id, grantor_id) in role.membership() {
5452 if &id == grantor_id {
5453 let member_role = scx.catalog.get_role(member_id);
5454 sql_bail!(
5455 "cannot drop role {}: still depended up by membership of role {} in role {}",
5456 name.as_str(),
5457 role.name(),
5458 member_role.name()
5459 );
5460 }
5461 }
5462 }
5463 Ok(Some(role.id()))
5464 }
5465 Err(_) if if_exists => Ok(None),
5466 Err(e) => Err(e.into()),
5467 }
5468}
5469
5470fn plan_drop_cluster(
5471 scx: &StatementContext,
5472 if_exists: bool,
5473 name: &Ident,
5474 cascade: bool,
5475) -> Result<Option<ClusterId>, PlanError> {
5476 Ok(match resolve_cluster(scx, name, if_exists)? {
5477 Some(cluster) => {
5478 if !cascade && !cluster.bound_objects().is_empty() {
5479 return Err(PlanError::DependentObjectsStillExist {
5480 object_type: "cluster".to_string(),
5481 object_name: cluster.name().to_string(),
5482 dependents: Vec::new(),
5483 });
5484 }
5485 Some(cluster.id())
5486 }
5487 None => None,
5488 })
5489}
5490
5491fn plan_drop_network_policy(
5492 scx: &StatementContext,
5493 if_exists: bool,
5494 name: &Ident,
5495) -> Result<Option<NetworkPolicyId>, PlanError> {
5496 match scx.catalog.resolve_network_policy(name.as_str()) {
5497 Ok(policy) => {
5498 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5501 Err(PlanError::NetworkPolicyInUse)
5502 } else {
5503 Ok(Some(policy.id()))
5504 }
5505 }
5506 Err(_) if if_exists => Ok(None),
5507 Err(e) => Err(e.into()),
5508 }
5509}
5510
5511fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5514 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5516 false
5517 } else {
5518 cluster.bound_objects().iter().any(|id| {
5520 let item = scx.catalog.get_item(id);
5521 matches!(
5522 item.item_type(),
5523 CatalogItemType::Sink | CatalogItemType::Source
5524 )
5525 })
5526 }
5527}
5528
5529fn plan_drop_cluster_replica(
5530 scx: &StatementContext,
5531 if_exists: bool,
5532 name: &QualifiedReplica,
5533) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5534 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5535 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5536}
5537
5538fn plan_drop_item(
5540 scx: &StatementContext,
5541 object_type: ObjectType,
5542 if_exists: bool,
5543 name: UnresolvedItemName,
5544 cascade: bool,
5545) -> Result<Option<CatalogItemId>, PlanError> {
5546 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5547 Ok(r) => r,
5548 Err(PlanError::MismatchedObjectType {
5550 name,
5551 is_type: ObjectType::MaterializedView,
5552 expected_type: ObjectType::View,
5553 }) => {
5554 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5555 }
5556 e => e?,
5557 };
5558
5559 Ok(match resolved {
5560 Some(catalog_item) => {
5561 if catalog_item.id().is_system() {
5562 sql_bail!(
5563 "cannot drop {} {} because it is required by the database system",
5564 catalog_item.item_type(),
5565 scx.catalog.minimal_qualification(catalog_item.name()),
5566 );
5567 }
5568
5569 if !cascade {
5570 for id in catalog_item.used_by() {
5571 let dep = scx.catalog.get_item(id);
5572 if dependency_prevents_drop(object_type, dep) {
5573 return Err(PlanError::DependentObjectsStillExist {
5574 object_type: catalog_item.item_type().to_string(),
5575 object_name: scx
5576 .catalog
5577 .minimal_qualification(catalog_item.name())
5578 .to_string(),
5579 dependents: vec![(
5580 dep.item_type().to_string(),
5581 scx.catalog.minimal_qualification(dep.name()).to_string(),
5582 )],
5583 });
5584 }
5585 }
5586 }
5589 Some(catalog_item.id())
5590 }
5591 None => None,
5592 })
5593}
5594
5595fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5597 match object_type {
5598 ObjectType::Type => true,
5599 _ => match dep.item_type() {
5600 CatalogItemType::Func
5601 | CatalogItemType::Table
5602 | CatalogItemType::Source
5603 | CatalogItemType::View
5604 | CatalogItemType::MaterializedView
5605 | CatalogItemType::Sink
5606 | CatalogItemType::Type
5607 | CatalogItemType::Secret
5608 | CatalogItemType::Connection
5609 | CatalogItemType::ContinualTask => true,
5610 CatalogItemType::Index => false,
5611 },
5612 }
5613}
5614
5615pub fn describe_alter_index_options(
5616 _: &StatementContext,
5617 _: AlterIndexStatement<Aug>,
5618) -> Result<StatementDesc, PlanError> {
5619 Ok(StatementDesc::new(None))
5620}
5621
5622pub fn describe_drop_owned(
5623 _: &StatementContext,
5624 _: DropOwnedStatement<Aug>,
5625) -> Result<StatementDesc, PlanError> {
5626 Ok(StatementDesc::new(None))
5627}
5628
5629pub fn plan_drop_owned(
5630 scx: &StatementContext,
5631 drop: DropOwnedStatement<Aug>,
5632) -> Result<Plan, PlanError> {
5633 let cascade = drop.cascade();
5634 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5635 let mut drop_ids = Vec::new();
5636 let mut privilege_revokes = Vec::new();
5637 let mut default_privilege_revokes = Vec::new();
5638
5639 fn update_privilege_revokes(
5640 object_id: SystemObjectId,
5641 privileges: &PrivilegeMap,
5642 role_ids: &BTreeSet<RoleId>,
5643 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5644 ) {
5645 privilege_revokes.extend(iter::zip(
5646 iter::repeat(object_id),
5647 privileges
5648 .all_values()
5649 .filter(|privilege| role_ids.contains(&privilege.grantee))
5650 .cloned(),
5651 ));
5652 }
5653
5654 for replica in scx.catalog.get_cluster_replicas() {
5656 if role_ids.contains(&replica.owner_id()) {
5657 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5658 }
5659 }
5660
5661 for cluster in scx.catalog.get_clusters() {
5663 if role_ids.contains(&cluster.owner_id()) {
5664 if !cascade {
5666 let non_owned_bound_objects: Vec<_> = cluster
5667 .bound_objects()
5668 .into_iter()
5669 .map(|item_id| scx.catalog.get_item(item_id))
5670 .filter(|item| !role_ids.contains(&item.owner_id()))
5671 .collect();
5672 if !non_owned_bound_objects.is_empty() {
5673 let names: Vec<_> = non_owned_bound_objects
5674 .into_iter()
5675 .map(|item| {
5676 (
5677 item.item_type().to_string(),
5678 scx.catalog.resolve_full_name(item.name()).to_string(),
5679 )
5680 })
5681 .collect();
5682 return Err(PlanError::DependentObjectsStillExist {
5683 object_type: "cluster".to_string(),
5684 object_name: cluster.name().to_string(),
5685 dependents: names,
5686 });
5687 }
5688 }
5689 drop_ids.push(cluster.id().into());
5690 }
5691 update_privilege_revokes(
5692 SystemObjectId::Object(cluster.id().into()),
5693 cluster.privileges(),
5694 &role_ids,
5695 &mut privilege_revokes,
5696 );
5697 }
5698
5699 for item in scx.catalog.get_items() {
5701 if role_ids.contains(&item.owner_id()) {
5702 if !cascade {
5703 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5705 let non_owned_dependencies: Vec<_> = used_by
5706 .into_iter()
5707 .map(|item_id| scx.catalog.get_item(item_id))
5708 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5709 .filter(|item| !role_ids.contains(&item.owner_id()))
5710 .collect();
5711 if !non_owned_dependencies.is_empty() {
5712 let names: Vec<_> = non_owned_dependencies
5713 .into_iter()
5714 .map(|item| {
5715 let item_typ = item.item_type().to_string();
5716 let item_name =
5717 scx.catalog.resolve_full_name(item.name()).to_string();
5718 (item_typ, item_name)
5719 })
5720 .collect();
5721 Err(PlanError::DependentObjectsStillExist {
5722 object_type: item.item_type().to_string(),
5723 object_name: scx
5724 .catalog
5725 .resolve_full_name(item.name())
5726 .to_string()
5727 .to_string(),
5728 dependents: names,
5729 })
5730 } else {
5731 Ok(())
5732 }
5733 };
5734
5735 if let Some(id) = item.progress_id() {
5738 let progress_item = scx.catalog.get_item(&id);
5739 check_if_dependents_exist(progress_item.used_by())?;
5740 }
5741 check_if_dependents_exist(item.used_by())?;
5742 }
5743 drop_ids.push(item.id().into());
5744 }
5745 update_privilege_revokes(
5746 SystemObjectId::Object(item.id().into()),
5747 item.privileges(),
5748 &role_ids,
5749 &mut privilege_revokes,
5750 );
5751 }
5752
5753 for schema in scx.catalog.get_schemas() {
5755 if !schema.id().is_temporary() {
5756 if role_ids.contains(&schema.owner_id()) {
5757 if !cascade {
5758 let non_owned_dependencies: Vec<_> = schema
5759 .item_ids()
5760 .map(|item_id| scx.catalog.get_item(&item_id))
5761 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5762 .filter(|item| !role_ids.contains(&item.owner_id()))
5763 .collect();
5764 if !non_owned_dependencies.is_empty() {
5765 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5766 sql_bail!(
5767 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5768 full_schema_name.to_string().quoted()
5769 );
5770 }
5771 }
5772 drop_ids.push((*schema.database(), *schema.id()).into())
5773 }
5774 update_privilege_revokes(
5775 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5776 schema.privileges(),
5777 &role_ids,
5778 &mut privilege_revokes,
5779 );
5780 }
5781 }
5782
5783 for database in scx.catalog.get_databases() {
5785 if role_ids.contains(&database.owner_id()) {
5786 if !cascade {
5787 let non_owned_schemas: Vec<_> = database
5788 .schemas()
5789 .into_iter()
5790 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5791 .collect();
5792 if !non_owned_schemas.is_empty() {
5793 sql_bail!(
5794 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5795 database.name().quoted(),
5796 );
5797 }
5798 }
5799 drop_ids.push(database.id().into());
5800 }
5801 update_privilege_revokes(
5802 SystemObjectId::Object(database.id().into()),
5803 database.privileges(),
5804 &role_ids,
5805 &mut privilege_revokes,
5806 );
5807 }
5808
5809 update_privilege_revokes(
5811 SystemObjectId::System,
5812 scx.catalog.get_system_privileges(),
5813 &role_ids,
5814 &mut privilege_revokes,
5815 );
5816
5817 for (default_privilege_object, default_privilege_acl_items) in
5818 scx.catalog.get_default_privileges()
5819 {
5820 for default_privilege_acl_item in default_privilege_acl_items {
5821 if role_ids.contains(&default_privilege_object.role_id)
5822 || role_ids.contains(&default_privilege_acl_item.grantee)
5823 {
5824 default_privilege_revokes.push((
5825 default_privilege_object.clone(),
5826 default_privilege_acl_item.clone(),
5827 ));
5828 }
5829 }
5830 }
5831
5832 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5833
5834 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5835 if !system_ids.is_empty() {
5836 let mut owners = system_ids
5837 .into_iter()
5838 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5839 .collect::<BTreeSet<_>>()
5840 .into_iter()
5841 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5842 sql_bail!(
5843 "cannot drop objects owned by role {} because they are required by the database system",
5844 owners.join(", "),
5845 );
5846 }
5847
5848 Ok(Plan::DropOwned(DropOwnedPlan {
5849 role_ids: role_ids.into_iter().collect(),
5850 drop_ids,
5851 privilege_revokes,
5852 default_privilege_revokes,
5853 }))
5854}
5855
5856fn plan_retain_history_option(
5857 scx: &StatementContext,
5858 retain_history: Option<OptionalDuration>,
5859) -> Result<Option<CompactionWindow>, PlanError> {
5860 if let Some(OptionalDuration(lcw)) = retain_history {
5861 Ok(Some(plan_retain_history(scx, lcw)?))
5862 } else {
5863 Ok(None)
5864 }
5865}
5866
5867fn plan_retain_history(
5873 scx: &StatementContext,
5874 lcw: Option<Duration>,
5875) -> Result<CompactionWindow, PlanError> {
5876 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5877 match lcw {
5878 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5883 option_name: "RETAIN HISTORY".to_string(),
5884 err: Box::new(PlanError::Unstructured(
5885 "internal error: unexpectedly zero".to_string(),
5886 )),
5887 }),
5888 Some(duration) => {
5889 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5892 && scx
5893 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5894 .is_err()
5895 {
5896 return Err(PlanError::RetainHistoryLow {
5897 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5898 });
5899 }
5900 Ok(duration.try_into()?)
5901 }
5902 None => {
5905 if scx
5906 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5907 .is_err()
5908 {
5909 Err(PlanError::RetainHistoryRequired)
5910 } else {
5911 Ok(CompactionWindow::DisableCompaction)
5912 }
5913 }
5914 }
5915}
5916
5917generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5918
5919fn plan_index_options(
5920 scx: &StatementContext,
5921 with_opts: Vec<IndexOption<Aug>>,
5922) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5923 if !with_opts.is_empty() {
5924 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5926 }
5927
5928 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5929
5930 let mut out = Vec::with_capacity(1);
5931 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5932 out.push(crate::plan::IndexOption::RetainHistory(cw));
5933 }
5934 Ok(out)
5935}
5936
5937generate_extracted_config!(
5938 TableOption,
5939 (PartitionBy, Vec<Ident>),
5940 (RetainHistory, OptionalDuration),
5941 (RedactedTest, String)
5942);
5943
5944fn plan_table_options(
5945 scx: &StatementContext,
5946 desc: &RelationDesc,
5947 with_opts: Vec<TableOption<Aug>>,
5948) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5949 let TableOptionExtracted {
5950 partition_by,
5951 retain_history,
5952 redacted_test,
5953 ..
5954 }: TableOptionExtracted = with_opts.try_into()?;
5955
5956 if let Some(partition_by) = partition_by {
5957 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5958 check_partition_by(desc, partition_by)?;
5959 }
5960
5961 if redacted_test.is_some() {
5962 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5963 }
5964
5965 let mut out = Vec::with_capacity(1);
5966 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5967 out.push(crate::plan::TableOption::RetainHistory(cw));
5968 }
5969 Ok(out)
5970}
5971
5972pub fn plan_alter_index_options(
5973 scx: &mut StatementContext,
5974 AlterIndexStatement {
5975 index_name,
5976 if_exists,
5977 action,
5978 }: AlterIndexStatement<Aug>,
5979) -> Result<Plan, PlanError> {
5980 let object_type = ObjectType::Index;
5981 match action {
5982 AlterIndexAction::ResetOptions(options) => {
5983 let mut options = options.into_iter();
5984 if let Some(opt) = options.next() {
5985 match opt {
5986 IndexOptionName::RetainHistory => {
5987 if options.next().is_some() {
5988 sql_bail!("RETAIN HISTORY must be only option");
5989 }
5990 return alter_retain_history(
5991 scx,
5992 object_type,
5993 if_exists,
5994 UnresolvedObjectName::Item(index_name),
5995 None,
5996 );
5997 }
5998 }
5999 }
6000 sql_bail!("expected option");
6001 }
6002 AlterIndexAction::SetOptions(options) => {
6003 let mut options = options.into_iter();
6004 if let Some(opt) = options.next() {
6005 match opt.name {
6006 IndexOptionName::RetainHistory => {
6007 if options.next().is_some() {
6008 sql_bail!("RETAIN HISTORY must be only option");
6009 }
6010 return alter_retain_history(
6011 scx,
6012 object_type,
6013 if_exists,
6014 UnresolvedObjectName::Item(index_name),
6015 opt.value,
6016 );
6017 }
6018 }
6019 }
6020 sql_bail!("expected option");
6021 }
6022 }
6023}
6024
6025pub fn describe_alter_cluster_set_options(
6026 _: &StatementContext,
6027 _: AlterClusterStatement<Aug>,
6028) -> Result<StatementDesc, PlanError> {
6029 Ok(StatementDesc::new(None))
6030}
6031
6032pub fn plan_alter_cluster(
6033 scx: &mut StatementContext,
6034 AlterClusterStatement {
6035 name,
6036 action,
6037 if_exists,
6038 }: AlterClusterStatement<Aug>,
6039) -> Result<Plan, PlanError> {
6040 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6041 Some(entry) => entry,
6042 None => {
6043 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6044 name: name.to_ast_string_simple(),
6045 object_type: ObjectType::Cluster,
6046 });
6047
6048 return Ok(Plan::AlterNoop(AlterNoopPlan {
6049 object_type: ObjectType::Cluster,
6050 }));
6051 }
6052 };
6053
6054 let mut options: PlanClusterOption = Default::default();
6055 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6056
6057 match action {
6058 AlterClusterAction::SetOptions {
6059 options: set_options,
6060 with_options,
6061 } => {
6062 let ClusterOptionExtracted {
6063 availability_zones,
6064 introspection_debugging,
6065 introspection_interval,
6066 managed,
6067 replicas: replica_defs,
6068 replication_factor,
6069 seen: _,
6070 size,
6071 disk,
6072 schedule,
6073 workload_class,
6074 }: ClusterOptionExtracted = set_options.try_into()?;
6075
6076 if !scx.catalog.active_role_id().is_system() {
6077 if workload_class.is_some() {
6078 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6079 }
6080 }
6081
6082 match managed.unwrap_or_else(|| cluster.is_managed()) {
6083 true => {
6084 let alter_strategy_extracted =
6085 ClusterAlterOptionExtracted::try_from(with_options)?;
6086 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6087
6088 match alter_strategy {
6089 AlterClusterPlanStrategy::None => {}
6090 _ => {
6091 scx.require_feature_flag(
6092 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6093 )?;
6094 }
6095 }
6096
6097 if replica_defs.is_some() {
6098 sql_bail!("REPLICAS not supported for managed clusters");
6099 }
6100 if schedule.is_some()
6101 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6102 {
6103 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6104 }
6105
6106 if let Some(replication_factor) = replication_factor {
6107 if schedule.is_some()
6108 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6109 {
6110 sql_bail!(
6111 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6112 );
6113 }
6114 if let Some(current_schedule) = cluster.schedule() {
6115 if !matches!(current_schedule, ClusterSchedule::Manual) {
6116 sql_bail!(
6117 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6118 );
6119 }
6120 }
6121
6122 let internal_replica_count =
6123 cluster.replicas().iter().filter(|r| r.internal()).count();
6124 let hypothetical_replica_count =
6125 internal_replica_count + usize::cast_from(replication_factor);
6126
6127 if contains_single_replica_objects(scx, cluster)
6130 && hypothetical_replica_count > 1
6131 {
6132 return Err(PlanError::CreateReplicaFailStorageObjects {
6133 current_replica_count: cluster.replica_ids().iter().count(),
6134 internal_replica_count,
6135 hypothetical_replica_count,
6136 });
6137 }
6138 } else if alter_strategy.is_some() {
6139 let internal_replica_count =
6143 cluster.replicas().iter().filter(|r| r.internal()).count();
6144 let hypothetical_replica_count = internal_replica_count * 2;
6145 if contains_single_replica_objects(scx, cluster) {
6146 return Err(PlanError::CreateReplicaFailStorageObjects {
6147 current_replica_count: cluster.replica_ids().iter().count(),
6148 internal_replica_count,
6149 hypothetical_replica_count,
6150 });
6151 }
6152 }
6153 }
6154 false => {
6155 if !alter_strategy.is_none() {
6156 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6157 }
6158 if availability_zones.is_some() {
6159 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6160 }
6161 if replication_factor.is_some() {
6162 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6163 }
6164 if introspection_debugging.is_some() {
6165 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6166 }
6167 if introspection_interval.is_some() {
6168 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6169 }
6170 if size.is_some() {
6171 sql_bail!("SIZE not supported for unmanaged clusters");
6172 }
6173 if disk.is_some() {
6174 sql_bail!("DISK not supported for unmanaged clusters");
6175 }
6176 if schedule.is_some()
6177 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6178 {
6179 sql_bail!(
6180 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6181 );
6182 }
6183 if let Some(current_schedule) = cluster.schedule() {
6184 if !matches!(current_schedule, ClusterSchedule::Manual)
6185 && schedule.is_none()
6186 {
6187 sql_bail!(
6188 "when switching a cluster to unmanaged, if the managed \
6189 cluster's SCHEDULE is anything other than MANUAL, you have to \
6190 explicitly set the SCHEDULE to MANUAL"
6191 );
6192 }
6193 }
6194 }
6195 }
6196
6197 let mut replicas = vec![];
6198 for ReplicaDefinition { name, options } in
6199 replica_defs.into_iter().flat_map(Vec::into_iter)
6200 {
6201 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6202 }
6203
6204 if let Some(managed) = managed {
6205 options.managed = AlterOptionParameter::Set(managed);
6206 }
6207 if let Some(replication_factor) = replication_factor {
6208 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6209 }
6210 if let Some(size) = &size {
6211 options.size = AlterOptionParameter::Set(size.clone());
6212 }
6213 if let Some(availability_zones) = availability_zones {
6214 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6215 }
6216 if let Some(introspection_debugging) = introspection_debugging {
6217 options.introspection_debugging =
6218 AlterOptionParameter::Set(introspection_debugging);
6219 }
6220 if let Some(introspection_interval) = introspection_interval {
6221 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6222 }
6223 if disk.is_some() {
6224 let size = size.as_deref().unwrap_or_else(|| {
6228 cluster.managed_size().expect("cluster known to be managed")
6229 });
6230 if scx.catalog.is_cluster_size_cc(size) {
6231 sql_bail!(
6232 "DISK option not supported for modern cluster sizes because disk is always enabled"
6233 );
6234 }
6235
6236 scx.catalog
6237 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6238 }
6239 if !replicas.is_empty() {
6240 options.replicas = AlterOptionParameter::Set(replicas);
6241 }
6242 if let Some(schedule) = schedule {
6243 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6244 }
6245 if let Some(workload_class) = workload_class {
6246 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6247 }
6248 }
6249 AlterClusterAction::ResetOptions(reset_options) => {
6250 use AlterOptionParameter::Reset;
6251 use ClusterOptionName::*;
6252
6253 if !scx.catalog.active_role_id().is_system() {
6254 if reset_options.contains(&WorkloadClass) {
6255 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6256 }
6257 }
6258
6259 for option in reset_options {
6260 match option {
6261 AvailabilityZones => options.availability_zones = Reset,
6262 Disk => scx
6263 .catalog
6264 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6265 IntrospectionInterval => options.introspection_interval = Reset,
6266 IntrospectionDebugging => options.introspection_debugging = Reset,
6267 Managed => options.managed = Reset,
6268 Replicas => options.replicas = Reset,
6269 ReplicationFactor => options.replication_factor = Reset,
6270 Size => options.size = Reset,
6271 Schedule => options.schedule = Reset,
6272 WorkloadClass => options.workload_class = Reset,
6273 }
6274 }
6275 }
6276 }
6277 Ok(Plan::AlterCluster(AlterClusterPlan {
6278 id: cluster.id(),
6279 name: cluster.name().to_string(),
6280 options,
6281 strategy: alter_strategy,
6282 }))
6283}
6284
6285pub fn describe_alter_set_cluster(
6286 _: &StatementContext,
6287 _: AlterSetClusterStatement<Aug>,
6288) -> Result<StatementDesc, PlanError> {
6289 Ok(StatementDesc::new(None))
6290}
6291
6292pub fn plan_alter_item_set_cluster(
6293 scx: &StatementContext,
6294 AlterSetClusterStatement {
6295 if_exists,
6296 set_cluster: in_cluster_name,
6297 name,
6298 object_type,
6299 }: AlterSetClusterStatement<Aug>,
6300) -> Result<Plan, PlanError> {
6301 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6302
6303 let object_type = object_type.into();
6304
6305 match object_type {
6307 ObjectType::MaterializedView => {}
6308 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6309 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6310 }
6311 _ => {
6312 bail_never_supported!(
6313 format!("ALTER {object_type} SET CLUSTER"),
6314 "sql/alter-set-cluster/",
6315 format!("{object_type} has no associated cluster")
6316 )
6317 }
6318 }
6319
6320 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6321
6322 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6323 Some(entry) => {
6324 let current_cluster = entry.cluster_id();
6325 let Some(current_cluster) = current_cluster else {
6326 sql_bail!("No cluster associated with {name}");
6327 };
6328
6329 if current_cluster == in_cluster.id() {
6330 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6331 } else {
6332 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6333 id: entry.id(),
6334 set_cluster: in_cluster.id(),
6335 }))
6336 }
6337 }
6338 None => {
6339 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6340 name: name.to_ast_string_simple(),
6341 object_type,
6342 });
6343
6344 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6345 }
6346 }
6347}
6348
6349pub fn describe_alter_object_rename(
6350 _: &StatementContext,
6351 _: AlterObjectRenameStatement,
6352) -> Result<StatementDesc, PlanError> {
6353 Ok(StatementDesc::new(None))
6354}
6355
6356pub fn plan_alter_object_rename(
6357 scx: &mut StatementContext,
6358 AlterObjectRenameStatement {
6359 name,
6360 object_type,
6361 to_item_name,
6362 if_exists,
6363 }: AlterObjectRenameStatement,
6364) -> Result<Plan, PlanError> {
6365 let object_type = object_type.into();
6366 match (object_type, name) {
6367 (
6368 ObjectType::View
6369 | ObjectType::MaterializedView
6370 | ObjectType::Table
6371 | ObjectType::Source
6372 | ObjectType::Index
6373 | ObjectType::Sink
6374 | ObjectType::Secret
6375 | ObjectType::Connection,
6376 UnresolvedObjectName::Item(name),
6377 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6378 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6379 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6380 }
6381 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6382 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6383 }
6384 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6385 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6386 }
6387 (object_type, name) => {
6388 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6389 }
6390 }
6391}
6392
6393pub fn plan_alter_schema_rename(
6394 scx: &mut StatementContext,
6395 name: UnresolvedSchemaName,
6396 to_schema_name: Ident,
6397 if_exists: bool,
6398) -> Result<Plan, PlanError> {
6399 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6400 let object_type = ObjectType::Schema;
6401 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6402 name: name.to_ast_string_simple(),
6403 object_type,
6404 });
6405 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6406 };
6407
6408 if scx
6410 .resolve_schema_in_database(&db_spec, &to_schema_name)
6411 .is_ok()
6412 {
6413 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6414 to_schema_name.clone().into_string(),
6415 )));
6416 }
6417
6418 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6420 if schema.id().is_system() {
6421 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6422 }
6423
6424 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6425 cur_schema_spec: (db_spec, schema_spec),
6426 new_schema_name: to_schema_name.into_string(),
6427 }))
6428}
6429
6430pub fn plan_alter_schema_swap<F>(
6431 scx: &mut StatementContext,
6432 name_a: UnresolvedSchemaName,
6433 name_b: Ident,
6434 gen_temp_suffix: F,
6435) -> Result<Plan, PlanError>
6436where
6437 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6438{
6439 let schema_a = scx.resolve_schema(name_a.clone())?;
6440
6441 let db_spec = schema_a.database().clone();
6442 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6443 sql_bail!("cannot swap schemas that are in the ambient database");
6444 };
6445 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6446
6447 if schema_a.id().is_system() || schema_b.id().is_system() {
6449 bail_never_supported!("swapping a system schema".to_string())
6450 }
6451
6452 let check = |temp_suffix: &str| {
6456 let mut temp_name = ident!("mz_schema_swap_");
6457 temp_name.append_lossy(temp_suffix);
6458 scx.resolve_schema_in_database(&db_spec, &temp_name)
6459 .is_err()
6460 };
6461 let temp_suffix = gen_temp_suffix(&check)?;
6462 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6463
6464 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6465 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6466 schema_a_name: schema_a.name().schema.to_string(),
6467 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6468 schema_b_name: schema_b.name().schema.to_string(),
6469 name_temp,
6470 }))
6471}
6472
6473pub fn plan_alter_item_rename(
6474 scx: &mut StatementContext,
6475 object_type: ObjectType,
6476 name: UnresolvedItemName,
6477 to_item_name: Ident,
6478 if_exists: bool,
6479) -> Result<Plan, PlanError> {
6480 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6481 Ok(r) => r,
6482 Err(PlanError::MismatchedObjectType {
6484 name,
6485 is_type: ObjectType::MaterializedView,
6486 expected_type: ObjectType::View,
6487 }) => {
6488 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6489 }
6490 e => e?,
6491 };
6492
6493 match resolved {
6494 Some(entry) => {
6495 let full_name = scx.catalog.resolve_full_name(entry.name());
6496 let item_type = entry.item_type();
6497
6498 let proposed_name = QualifiedItemName {
6499 qualifiers: entry.name().qualifiers.clone(),
6500 item: to_item_name.clone().into_string(),
6501 };
6502
6503 let conflicting_type_exists;
6507 let conflicting_item_exists;
6508 if item_type == CatalogItemType::Type {
6509 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6510 conflicting_item_exists = scx
6511 .catalog
6512 .get_item_by_name(&proposed_name)
6513 .map(|item| item.item_type().conflicts_with_type())
6514 .unwrap_or(false);
6515 } else {
6516 conflicting_type_exists = item_type.conflicts_with_type()
6517 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6518 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6519 };
6520 if conflicting_type_exists || conflicting_item_exists {
6521 sql_bail!("catalog item '{}' already exists", to_item_name);
6522 }
6523
6524 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6525 id: entry.id(),
6526 current_full_name: full_name,
6527 to_name: normalize::ident(to_item_name),
6528 object_type,
6529 }))
6530 }
6531 None => {
6532 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6533 name: name.to_ast_string_simple(),
6534 object_type,
6535 });
6536
6537 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6538 }
6539 }
6540}
6541
6542pub fn plan_alter_cluster_rename(
6543 scx: &mut StatementContext,
6544 object_type: ObjectType,
6545 name: Ident,
6546 to_name: Ident,
6547 if_exists: bool,
6548) -> Result<Plan, PlanError> {
6549 match resolve_cluster(scx, &name, if_exists)? {
6550 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6551 id: entry.id(),
6552 name: entry.name().to_string(),
6553 to_name: ident(to_name),
6554 })),
6555 None => {
6556 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6557 name: name.to_ast_string_simple(),
6558 object_type,
6559 });
6560
6561 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6562 }
6563 }
6564}
6565
6566pub fn plan_alter_cluster_swap<F>(
6567 scx: &mut StatementContext,
6568 name_a: Ident,
6569 name_b: Ident,
6570 gen_temp_suffix: F,
6571) -> Result<Plan, PlanError>
6572where
6573 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6574{
6575 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6576 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6577
6578 let check = |temp_suffix: &str| {
6579 let mut temp_name = ident!("mz_schema_swap_");
6580 temp_name.append_lossy(temp_suffix);
6581 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6582 Err(CatalogError::UnknownCluster(_)) => true,
6584 Ok(_) | Err(_) => false,
6586 }
6587 };
6588 let temp_suffix = gen_temp_suffix(&check)?;
6589 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6590
6591 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6592 id_a: cluster_a.id(),
6593 id_b: cluster_b.id(),
6594 name_a: name_a.into_string(),
6595 name_b: name_b.into_string(),
6596 name_temp,
6597 }))
6598}
6599
6600pub fn plan_alter_cluster_replica_rename(
6601 scx: &mut StatementContext,
6602 object_type: ObjectType,
6603 name: QualifiedReplica,
6604 to_item_name: Ident,
6605 if_exists: bool,
6606) -> Result<Plan, PlanError> {
6607 match resolve_cluster_replica(scx, &name, if_exists)? {
6608 Some((cluster, replica)) => {
6609 ensure_cluster_is_not_managed(scx, cluster.id())?;
6610 Ok(Plan::AlterClusterReplicaRename(
6611 AlterClusterReplicaRenamePlan {
6612 cluster_id: cluster.id(),
6613 replica_id: replica,
6614 name: QualifiedReplica {
6615 cluster: Ident::new(cluster.name())?,
6616 replica: name.replica,
6617 },
6618 to_name: normalize::ident(to_item_name),
6619 },
6620 ))
6621 }
6622 None => {
6623 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6624 name: name.to_ast_string_simple(),
6625 object_type,
6626 });
6627
6628 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6629 }
6630 }
6631}
6632
6633pub fn describe_alter_object_swap(
6634 _: &StatementContext,
6635 _: AlterObjectSwapStatement,
6636) -> Result<StatementDesc, PlanError> {
6637 Ok(StatementDesc::new(None))
6638}
6639
6640pub fn plan_alter_object_swap(
6641 scx: &mut StatementContext,
6642 stmt: AlterObjectSwapStatement,
6643) -> Result<Plan, PlanError> {
6644 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6645
6646 let AlterObjectSwapStatement {
6647 object_type,
6648 name_a,
6649 name_b,
6650 } = stmt;
6651 let object_type = object_type.into();
6652
6653 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6655 let mut attempts = 0;
6656 let name_temp = loop {
6657 attempts += 1;
6658 if attempts > 10 {
6659 tracing::warn!("Unable to generate temp id for swapping");
6660 sql_bail!("unable to swap!");
6661 }
6662
6663 let short_id = mz_ore::id_gen::temp_id();
6665 if check_fn(&short_id) {
6666 break short_id;
6667 }
6668 };
6669
6670 Ok(name_temp)
6671 };
6672
6673 match (object_type, name_a, name_b) {
6674 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6675 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6676 }
6677 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6678 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6679 }
6680 (object_type, _, _) => Err(PlanError::Unsupported {
6681 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6682 discussion_no: None,
6683 }),
6684 }
6685}
6686
6687pub fn describe_alter_retain_history(
6688 _: &StatementContext,
6689 _: AlterRetainHistoryStatement<Aug>,
6690) -> Result<StatementDesc, PlanError> {
6691 Ok(StatementDesc::new(None))
6692}
6693
6694pub fn plan_alter_retain_history(
6695 scx: &StatementContext,
6696 AlterRetainHistoryStatement {
6697 object_type,
6698 if_exists,
6699 name,
6700 history,
6701 }: AlterRetainHistoryStatement<Aug>,
6702) -> Result<Plan, PlanError> {
6703 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6704}
6705
6706fn alter_retain_history(
6707 scx: &StatementContext,
6708 object_type: ObjectType,
6709 if_exists: bool,
6710 name: UnresolvedObjectName,
6711 history: Option<WithOptionValue<Aug>>,
6712) -> Result<Plan, PlanError> {
6713 let name = match (object_type, name) {
6714 (
6715 ObjectType::View
6717 | ObjectType::MaterializedView
6718 | ObjectType::Table
6719 | ObjectType::Source
6720 | ObjectType::Index,
6721 UnresolvedObjectName::Item(name),
6722 ) => name,
6723 (object_type, _) => {
6724 sql_bail!("{object_type} does not support RETAIN HISTORY")
6725 }
6726 };
6727 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6728 Some(entry) => {
6729 let full_name = scx.catalog.resolve_full_name(entry.name());
6730 let item_type = entry.item_type();
6731
6732 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6734 return Err(PlanError::AlterViewOnMaterializedView(
6735 full_name.to_string(),
6736 ));
6737 } else if object_type == ObjectType::View {
6738 sql_bail!("{object_type} does not support RETAIN HISTORY")
6739 } else if object_type != item_type {
6740 sql_bail!(
6741 "\"{}\" is a {} not a {}",
6742 full_name,
6743 entry.item_type(),
6744 format!("{object_type}").to_lowercase()
6745 )
6746 }
6747
6748 let (value, lcw) = match &history {
6750 Some(WithOptionValue::RetainHistoryFor(value)) => {
6751 let window = OptionalDuration::try_from_value(value.clone())?;
6752 (Some(value.clone()), window.0)
6753 }
6754 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6756 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6757 };
6758 let window = plan_retain_history(scx, lcw)?;
6759
6760 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6761 id: entry.id(),
6762 value,
6763 window,
6764 object_type,
6765 }))
6766 }
6767 None => {
6768 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6769 name: name.to_ast_string_simple(),
6770 object_type,
6771 });
6772
6773 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6774 }
6775 }
6776}
6777
6778pub fn describe_alter_secret_options(
6779 _: &StatementContext,
6780 _: AlterSecretStatement<Aug>,
6781) -> Result<StatementDesc, PlanError> {
6782 Ok(StatementDesc::new(None))
6783}
6784
6785pub fn plan_alter_secret(
6786 scx: &mut StatementContext,
6787 stmt: AlterSecretStatement<Aug>,
6788) -> Result<Plan, PlanError> {
6789 let AlterSecretStatement {
6790 name,
6791 if_exists,
6792 value,
6793 } = stmt;
6794 let object_type = ObjectType::Secret;
6795 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6796 Some(entry) => entry.id(),
6797 None => {
6798 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6799 name: name.to_string(),
6800 object_type,
6801 });
6802
6803 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6804 }
6805 };
6806
6807 let secret_as = query::plan_secret_as(scx, value)?;
6808
6809 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6810}
6811
6812pub fn describe_alter_connection(
6813 _: &StatementContext,
6814 _: AlterConnectionStatement<Aug>,
6815) -> Result<StatementDesc, PlanError> {
6816 Ok(StatementDesc::new(None))
6817}
6818
6819generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6820
6821pub fn plan_alter_connection(
6822 scx: &StatementContext,
6823 stmt: AlterConnectionStatement<Aug>,
6824) -> Result<Plan, PlanError> {
6825 let AlterConnectionStatement {
6826 name,
6827 if_exists,
6828 actions,
6829 with_options,
6830 } = stmt;
6831 let conn_name = normalize::unresolved_item_name(name)?;
6832 let entry = match scx.catalog.resolve_item(&conn_name) {
6833 Ok(entry) => entry,
6834 Err(_) if if_exists => {
6835 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6836 name: conn_name.to_string(),
6837 object_type: ObjectType::Sink,
6838 });
6839
6840 return Ok(Plan::AlterNoop(AlterNoopPlan {
6841 object_type: ObjectType::Connection,
6842 }));
6843 }
6844 Err(e) => return Err(e.into()),
6845 };
6846
6847 let connection = entry.connection()?;
6848
6849 if actions
6850 .iter()
6851 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6852 {
6853 if actions.len() > 1 {
6854 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6855 }
6856
6857 if !with_options.is_empty() {
6858 sql_bail!(
6859 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6860 with_options
6861 .iter()
6862 .map(|o| o.to_ast_string_simple())
6863 .join(", ")
6864 );
6865 }
6866
6867 if !matches!(connection, Connection::Ssh(_)) {
6868 sql_bail!(
6869 "{} is not an SSH connection",
6870 scx.catalog.resolve_full_name(entry.name())
6871 )
6872 }
6873
6874 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6875 id: entry.id(),
6876 action: crate::plan::AlterConnectionAction::RotateKeys,
6877 }));
6878 }
6879
6880 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6881 if options.validate.is_some() {
6882 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6883 }
6884
6885 let validate = match options.validate {
6886 Some(val) => val,
6887 None => {
6888 scx.catalog
6889 .system_vars()
6890 .enable_default_connection_validation()
6891 && connection.validate_by_default()
6892 }
6893 };
6894
6895 let connection_type = match connection {
6896 Connection::Aws(_) => CreateConnectionType::Aws,
6897 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6898 Connection::Kafka(_) => CreateConnectionType::Kafka,
6899 Connection::Csr(_) => CreateConnectionType::Csr,
6900 Connection::Postgres(_) => CreateConnectionType::Postgres,
6901 Connection::Ssh(_) => CreateConnectionType::Ssh,
6902 Connection::MySql(_) => CreateConnectionType::MySql,
6903 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6904 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6905 };
6906
6907 let specified_options: BTreeSet<_> = actions
6909 .iter()
6910 .map(|action: &AlterConnectionAction<Aug>| match action {
6911 AlterConnectionAction::SetOption(option) => option.name.clone(),
6912 AlterConnectionAction::DropOption(name) => name.clone(),
6913 AlterConnectionAction::RotateKeys => unreachable!(),
6914 })
6915 .collect();
6916
6917 for invalid in INALTERABLE_OPTIONS {
6918 if specified_options.contains(invalid) {
6919 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6920 }
6921 }
6922
6923 connection::validate_options_per_connection_type(connection_type, specified_options)?;
6924
6925 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6927 actions.into_iter().partition_map(|action| match action {
6928 AlterConnectionAction::SetOption(option) => Either::Left(option),
6929 AlterConnectionAction::DropOption(name) => Either::Right(name),
6930 AlterConnectionAction::RotateKeys => unreachable!(),
6931 });
6932
6933 let set_options: BTreeMap<_, _> = set_options_vec
6934 .clone()
6935 .into_iter()
6936 .map(|option| (option.name, option.value))
6937 .collect();
6938
6939 let connection_options_extracted =
6943 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6944
6945 let duplicates: Vec<_> = connection_options_extracted
6946 .seen
6947 .intersection(&drop_options)
6948 .collect();
6949
6950 if !duplicates.is_empty() {
6951 sql_bail!(
6952 "cannot both SET and DROP/RESET options {}",
6953 duplicates
6954 .iter()
6955 .map(|option| option.to_string())
6956 .join(", ")
6957 )
6958 }
6959
6960 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6961 let set_options_count = mutually_exclusive_options
6962 .iter()
6963 .filter(|o| set_options.contains_key(o))
6964 .count();
6965 let drop_options_count = mutually_exclusive_options
6966 .iter()
6967 .filter(|o| drop_options.contains(o))
6968 .count();
6969
6970 if set_options_count > 0 && drop_options_count > 0 {
6972 sql_bail!(
6973 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6974 connection_type,
6975 mutually_exclusive_options
6976 .iter()
6977 .map(|option| option.to_string())
6978 .join(", ")
6979 )
6980 }
6981
6982 if set_options_count > 0 || drop_options_count > 0 {
6987 drop_options.extend(mutually_exclusive_options.iter().cloned());
6988 }
6989
6990 }
6993
6994 Ok(Plan::AlterConnection(AlterConnectionPlan {
6995 id: entry.id(),
6996 action: crate::plan::AlterConnectionAction::AlterOptions {
6997 set_options,
6998 drop_options,
6999 validate,
7000 },
7001 }))
7002}
7003
7004pub fn describe_alter_sink(
7005 _: &StatementContext,
7006 _: AlterSinkStatement<Aug>,
7007) -> Result<StatementDesc, PlanError> {
7008 Ok(StatementDesc::new(None))
7009}
7010
7011pub fn plan_alter_sink(
7012 scx: &mut StatementContext,
7013 stmt: AlterSinkStatement<Aug>,
7014) -> Result<Plan, PlanError> {
7015 let AlterSinkStatement {
7016 sink_name,
7017 if_exists,
7018 action,
7019 } = stmt;
7020
7021 let object_type = ObjectType::Sink;
7022 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7023
7024 let Some(item) = item else {
7025 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7026 name: sink_name.to_string(),
7027 object_type,
7028 });
7029
7030 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7031 };
7032 let item = item.at_version(RelationVersionSelector::Latest);
7034
7035 match action {
7036 AlterSinkAction::ChangeRelation(new_from) => {
7037 let create_sql = item.create_sql();
7039 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7040 let [stmt]: [StatementParseResult; 1] = stmts
7041 .try_into()
7042 .expect("create sql of sink was not exactly one statement");
7043 let Statement::CreateSink(stmt) = stmt.ast else {
7044 unreachable!("invalid create SQL for sink item");
7045 };
7046
7047 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7049 stmt.from = new_from;
7050
7051 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7053 unreachable!("invalid plan for CREATE SINK statement");
7054 };
7055
7056 plan.sink.version += 1;
7057
7058 Ok(Plan::AlterSink(AlterSinkPlan {
7059 item_id: item.id(),
7060 global_id: item.global_id(),
7061 sink: plan.sink,
7062 with_snapshot: plan.with_snapshot,
7063 in_cluster: plan.in_cluster,
7064 }))
7065 }
7066 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7067 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7068 }
7069}
7070
7071pub fn describe_alter_source(
7072 _: &StatementContext,
7073 _: AlterSourceStatement<Aug>,
7074) -> Result<StatementDesc, PlanError> {
7075 Ok(StatementDesc::new(None))
7077}
7078
7079generate_extracted_config!(
7080 AlterSourceAddSubsourceOption,
7081 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7082 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7083 (Details, String)
7084);
7085
7086pub fn plan_alter_source(
7087 scx: &mut StatementContext,
7088 stmt: AlterSourceStatement<Aug>,
7089) -> Result<Plan, PlanError> {
7090 let AlterSourceStatement {
7091 source_name,
7092 if_exists,
7093 action,
7094 } = stmt;
7095 let object_type = ObjectType::Source;
7096
7097 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7098 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7099 name: source_name.to_string(),
7100 object_type,
7101 });
7102
7103 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7104 }
7105
7106 match action {
7107 AlterSourceAction::SetOptions(options) => {
7108 let mut options = options.into_iter();
7109 let option = options.next().unwrap();
7110 if option.name == CreateSourceOptionName::RetainHistory {
7111 if options.next().is_some() {
7112 sql_bail!("RETAIN HISTORY must be only option");
7113 }
7114 return alter_retain_history(
7115 scx,
7116 object_type,
7117 if_exists,
7118 UnresolvedObjectName::Item(source_name),
7119 option.value,
7120 );
7121 }
7122 sql_bail!(
7125 "Cannot modify the {} of a SOURCE.",
7126 option.name.to_ast_string_simple()
7127 );
7128 }
7129 AlterSourceAction::ResetOptions(reset) => {
7130 let mut options = reset.into_iter();
7131 let option = options.next().unwrap();
7132 if option == CreateSourceOptionName::RetainHistory {
7133 if options.next().is_some() {
7134 sql_bail!("RETAIN HISTORY must be only option");
7135 }
7136 return alter_retain_history(
7137 scx,
7138 object_type,
7139 if_exists,
7140 UnresolvedObjectName::Item(source_name),
7141 None,
7142 );
7143 }
7144 sql_bail!(
7145 "Cannot modify the {} of a SOURCE.",
7146 option.to_ast_string_simple()
7147 );
7148 }
7149 AlterSourceAction::DropSubsources { .. } => {
7150 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7151 }
7152 AlterSourceAction::AddSubsources { .. } => {
7153 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7154 }
7155 AlterSourceAction::RefreshReferences => {
7156 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7157 }
7158 };
7159}
7160
7161pub fn describe_alter_system_set(
7162 _: &StatementContext,
7163 _: AlterSystemSetStatement,
7164) -> Result<StatementDesc, PlanError> {
7165 Ok(StatementDesc::new(None))
7166}
7167
7168pub fn plan_alter_system_set(
7169 _: &StatementContext,
7170 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7171) -> Result<Plan, PlanError> {
7172 let name = name.to_string();
7173 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7174 name,
7175 value: scl::plan_set_variable_to(to)?,
7176 }))
7177}
7178
7179pub fn describe_alter_system_reset(
7180 _: &StatementContext,
7181 _: AlterSystemResetStatement,
7182) -> Result<StatementDesc, PlanError> {
7183 Ok(StatementDesc::new(None))
7184}
7185
7186pub fn plan_alter_system_reset(
7187 _: &StatementContext,
7188 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7189) -> Result<Plan, PlanError> {
7190 let name = name.to_string();
7191 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7192}
7193
7194pub fn describe_alter_system_reset_all(
7195 _: &StatementContext,
7196 _: AlterSystemResetAllStatement,
7197) -> Result<StatementDesc, PlanError> {
7198 Ok(StatementDesc::new(None))
7199}
7200
7201pub fn plan_alter_system_reset_all(
7202 _: &StatementContext,
7203 _: AlterSystemResetAllStatement,
7204) -> Result<Plan, PlanError> {
7205 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7206}
7207
7208pub fn describe_alter_role(
7209 _: &StatementContext,
7210 _: AlterRoleStatement<Aug>,
7211) -> Result<StatementDesc, PlanError> {
7212 Ok(StatementDesc::new(None))
7213}
7214
7215pub fn plan_alter_role(
7216 _scx: &StatementContext,
7217 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7218) -> Result<Plan, PlanError> {
7219 let option = match option {
7220 AlterRoleOption::Attributes(attrs) => {
7221 let attrs = plan_role_attributes(attrs)?;
7222 PlannedAlterRoleOption::Attributes(attrs)
7223 }
7224 AlterRoleOption::Variable(variable) => {
7225 let var = plan_role_variable(variable)?;
7226 PlannedAlterRoleOption::Variable(var)
7227 }
7228 };
7229
7230 Ok(Plan::AlterRole(AlterRolePlan {
7231 id: name.id,
7232 name: name.name,
7233 option,
7234 }))
7235}
7236
7237pub fn describe_alter_table_add_column(
7238 _: &StatementContext,
7239 _: AlterTableAddColumnStatement<Aug>,
7240) -> Result<StatementDesc, PlanError> {
7241 Ok(StatementDesc::new(None))
7242}
7243
7244pub fn plan_alter_table_add_column(
7245 scx: &StatementContext,
7246 stmt: AlterTableAddColumnStatement<Aug>,
7247) -> Result<Plan, PlanError> {
7248 let AlterTableAddColumnStatement {
7249 if_exists,
7250 name,
7251 if_col_not_exist,
7252 column_name,
7253 data_type,
7254 } = stmt;
7255 let object_type = ObjectType::Table;
7256
7257 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7258
7259 let (relation_id, item_name, desc) =
7260 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7261 Some(item) => {
7262 let item_name = scx.catalog.resolve_full_name(item.name());
7264 let item = item.at_version(RelationVersionSelector::Latest);
7265 let desc = item.desc(&item_name)?.into_owned();
7266 (item.id(), item_name, desc)
7267 }
7268 None => {
7269 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7270 name: name.to_ast_string_simple(),
7271 object_type,
7272 });
7273 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7274 }
7275 };
7276
7277 let column_name = ColumnName::from(column_name.as_str());
7278 if desc.get_by_name(&column_name).is_some() {
7279 if if_col_not_exist {
7280 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7281 column_name: column_name.to_string(),
7282 object_name: item_name.item,
7283 });
7284 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7285 } else {
7286 return Err(PlanError::ColumnAlreadyExists {
7287 column_name,
7288 object_name: item_name.item,
7289 });
7290 }
7291 }
7292
7293 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7294 let column_type = scalar_type.nullable(true);
7296 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7298
7299 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7300 relation_id,
7301 column_name,
7302 column_type,
7303 raw_sql_type,
7304 }))
7305}
7306
7307pub fn describe_comment(
7308 _: &StatementContext,
7309 _: CommentStatement<Aug>,
7310) -> Result<StatementDesc, PlanError> {
7311 Ok(StatementDesc::new(None))
7312}
7313
7314pub fn plan_comment(
7315 scx: &mut StatementContext,
7316 stmt: CommentStatement<Aug>,
7317) -> Result<Plan, PlanError> {
7318 const MAX_COMMENT_LENGTH: usize = 1024;
7319
7320 let CommentStatement { object, comment } = stmt;
7321
7322 if let Some(c) = &comment {
7324 if c.len() > 1024 {
7325 return Err(PlanError::CommentTooLong {
7326 length: c.len(),
7327 max_size: MAX_COMMENT_LENGTH,
7328 });
7329 }
7330 }
7331
7332 let (object_id, column_pos) = match &object {
7333 com_ty @ CommentObjectType::Table { name }
7334 | com_ty @ CommentObjectType::View { name }
7335 | com_ty @ CommentObjectType::MaterializedView { name }
7336 | com_ty @ CommentObjectType::Index { name }
7337 | com_ty @ CommentObjectType::Func { name }
7338 | com_ty @ CommentObjectType::Connection { name }
7339 | com_ty @ CommentObjectType::Source { name }
7340 | com_ty @ CommentObjectType::Sink { name }
7341 | com_ty @ CommentObjectType::Secret { name }
7342 | com_ty @ CommentObjectType::ContinualTask { name } => {
7343 let item = scx.get_item_by_resolved_name(name)?;
7344 match (com_ty, item.item_type()) {
7345 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7346 (CommentObjectId::Table(item.id()), None)
7347 }
7348 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7349 (CommentObjectId::View(item.id()), None)
7350 }
7351 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7352 (CommentObjectId::MaterializedView(item.id()), None)
7353 }
7354 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7355 (CommentObjectId::Index(item.id()), None)
7356 }
7357 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7358 (CommentObjectId::Func(item.id()), None)
7359 }
7360 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7361 (CommentObjectId::Connection(item.id()), None)
7362 }
7363 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7364 (CommentObjectId::Source(item.id()), None)
7365 }
7366 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7367 (CommentObjectId::Sink(item.id()), None)
7368 }
7369 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7370 (CommentObjectId::Secret(item.id()), None)
7371 }
7372 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7373 (CommentObjectId::ContinualTask(item.id()), None)
7374 }
7375 (com_ty, cat_ty) => {
7376 let expected_type = match com_ty {
7377 CommentObjectType::Table { .. } => ObjectType::Table,
7378 CommentObjectType::View { .. } => ObjectType::View,
7379 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7380 CommentObjectType::Index { .. } => ObjectType::Index,
7381 CommentObjectType::Func { .. } => ObjectType::Func,
7382 CommentObjectType::Connection { .. } => ObjectType::Connection,
7383 CommentObjectType::Source { .. } => ObjectType::Source,
7384 CommentObjectType::Sink { .. } => ObjectType::Sink,
7385 CommentObjectType::Secret { .. } => ObjectType::Secret,
7386 _ => unreachable!("these are the only types we match on"),
7387 };
7388
7389 return Err(PlanError::InvalidObjectType {
7390 expected_type: SystemObjectType::Object(expected_type),
7391 actual_type: SystemObjectType::Object(cat_ty.into()),
7392 object_name: item.name().item.clone(),
7393 });
7394 }
7395 }
7396 }
7397 CommentObjectType::Type { ty } => match ty {
7398 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7399 sql_bail!("cannot comment on anonymous list or map type");
7400 }
7401 ResolvedDataType::Named { id, modifiers, .. } => {
7402 if !modifiers.is_empty() {
7403 sql_bail!("cannot comment on type with modifiers");
7404 }
7405 (CommentObjectId::Type(*id), None)
7406 }
7407 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7408 },
7409 CommentObjectType::Column { name } => {
7410 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7411 match item.item_type() {
7412 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7413 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7414 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7415 CatalogItemType::MaterializedView => {
7416 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7417 }
7418 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7419 r => {
7420 return Err(PlanError::Unsupported {
7421 feature: format!("Specifying comments on a column of {r}"),
7422 discussion_no: None,
7423 });
7424 }
7425 }
7426 }
7427 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7428 CommentObjectType::Database { name } => {
7429 (CommentObjectId::Database(*name.database_id()), None)
7430 }
7431 CommentObjectType::Schema { name } => (
7432 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7433 None,
7434 ),
7435 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7436 CommentObjectType::ClusterReplica { name } => {
7437 let replica = scx.catalog.resolve_cluster_replica(name)?;
7438 (
7439 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7440 None,
7441 )
7442 }
7443 CommentObjectType::NetworkPolicy { name } => {
7444 (CommentObjectId::NetworkPolicy(name.id), None)
7445 }
7446 };
7447
7448 if let Some(p) = column_pos {
7454 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7455 max_num_columns: MAX_NUM_COLUMNS,
7456 req_num_columns: p,
7457 })?;
7458 }
7459
7460 Ok(Plan::Comment(CommentPlan {
7461 object_id,
7462 sub_component: column_pos,
7463 comment,
7464 }))
7465}
7466
7467pub(crate) fn resolve_cluster<'a>(
7468 scx: &'a StatementContext,
7469 name: &'a Ident,
7470 if_exists: bool,
7471) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7472 match scx.resolve_cluster(Some(name)) {
7473 Ok(cluster) => Ok(Some(cluster)),
7474 Err(_) if if_exists => Ok(None),
7475 Err(e) => Err(e),
7476 }
7477}
7478
7479pub(crate) fn resolve_cluster_replica<'a>(
7480 scx: &'a StatementContext,
7481 name: &QualifiedReplica,
7482 if_exists: bool,
7483) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7484 match scx.resolve_cluster(Some(&name.cluster)) {
7485 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7486 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7487 None if if_exists => Ok(None),
7488 None => Err(sql_err!(
7489 "CLUSTER {} has no CLUSTER REPLICA named {}",
7490 cluster.name(),
7491 name.replica.as_str().quoted(),
7492 )),
7493 },
7494 Err(_) if if_exists => Ok(None),
7495 Err(e) => Err(e),
7496 }
7497}
7498
7499pub(crate) fn resolve_database<'a>(
7500 scx: &'a StatementContext,
7501 name: &'a UnresolvedDatabaseName,
7502 if_exists: bool,
7503) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7504 match scx.resolve_database(name) {
7505 Ok(database) => Ok(Some(database)),
7506 Err(_) if if_exists => Ok(None),
7507 Err(e) => Err(e),
7508 }
7509}
7510
7511pub(crate) fn resolve_schema<'a>(
7512 scx: &'a StatementContext,
7513 name: UnresolvedSchemaName,
7514 if_exists: bool,
7515) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7516 match scx.resolve_schema(name) {
7517 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7518 Err(_) if if_exists => Ok(None),
7519 Err(e) => Err(e),
7520 }
7521}
7522
7523pub(crate) fn resolve_network_policy<'a>(
7524 scx: &'a StatementContext,
7525 name: Ident,
7526 if_exists: bool,
7527) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7528 match scx.catalog.resolve_network_policy(&name.to_string()) {
7529 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7530 id: policy.id(),
7531 name: policy.name().to_string(),
7532 })),
7533 Err(_) if if_exists => Ok(None),
7534 Err(e) => Err(e.into()),
7535 }
7536}
7537
7538pub(crate) fn resolve_item_or_type<'a>(
7539 scx: &'a StatementContext,
7540 object_type: ObjectType,
7541 name: UnresolvedItemName,
7542 if_exists: bool,
7543) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7544 let name = normalize::unresolved_item_name(name)?;
7545 let catalog_item = match object_type {
7546 ObjectType::Type => scx.catalog.resolve_type(&name),
7547 _ => scx.catalog.resolve_item(&name),
7548 };
7549
7550 match catalog_item {
7551 Ok(item) => {
7552 let is_type = ObjectType::from(item.item_type());
7553 if object_type == is_type {
7554 Ok(Some(item))
7555 } else {
7556 Err(PlanError::MismatchedObjectType {
7557 name: scx.catalog.minimal_qualification(item.name()),
7558 is_type,
7559 expected_type: object_type,
7560 })
7561 }
7562 }
7563 Err(_) if if_exists => Ok(None),
7564 Err(e) => Err(e.into()),
7565 }
7566}
7567
7568fn ensure_cluster_is_not_managed(
7570 scx: &StatementContext,
7571 cluster_id: ClusterId,
7572) -> Result<(), PlanError> {
7573 let cluster = scx.catalog.get_cluster(cluster_id);
7574 if cluster.is_managed() {
7575 Err(PlanError::ManagedCluster {
7576 cluster_name: cluster.name().to_string(),
7577 })
7578 } else {
7579 Ok(())
7580 }
7581}