1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_ore::vec::VecExt;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::adt::interval::Interval;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::network_policy_id::NetworkPolicyId;
38use mz_repr::optimize::OptimizerFeatureOverrides;
39use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, ColumnName, ColumnType, RelationDesc, RelationType, RelationVersion,
43 RelationVersionSelector, ScalarType, Timestamp, VersionedRelationDesc, preserves_order,
44 strconv,
45};
46use mz_sql_parser::ast::{
47 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
48 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
49 AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
50 AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
51 AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
52 AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
53 AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
54 AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
55 ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
56 ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
57 ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
58 ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
59 ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
60 CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
61 CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
62 CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
63 CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
64 CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
65 CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
66 CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
67 CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
68 CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
69 CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
70 CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
71 CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
72 DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
91 StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::SqlServerSourceExportDetails;
115use mz_storage_types::sources::{
116 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
117 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
118 SourceExportDetails, SourceExportStatementDetails, SqlServerSource, SqlServerSourceExtras,
119 Timeline,
120};
121use prost::Message;
122
123use crate::ast::display::AstDisplay;
124use crate::catalog::{
125 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
126 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
127};
128use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
129use crate::names::{
130 Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
131 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
132 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
133};
134use crate::normalize::{self, ident};
135use crate::plan::error::PlanError;
136use crate::plan::query::{
137 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
138 scalar_type_from_sql,
139};
140use crate::plan::scope::Scope;
141use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
142use crate::plan::statement::{StatementContext, StatementDesc, scl};
143use crate::plan::typeconv::CastContext;
144use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
145use crate::plan::{
146 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
147 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
148 AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
149 AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
150 AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
151 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
152 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
153 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
154 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
155 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
156 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
157 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, Index, Ingestion,
158 MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection,
159 Params, Plan, PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig,
160 Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
161 WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
162 transform_ast,
163};
164use crate::session::vars::{
165 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
166 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
167};
168use crate::{names, parse};
169
170mod connection;
171
172const MAX_NUM_COLUMNS: usize = 256;
177
178static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
179 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
180
181fn check_partition_by(desc: &RelationDesc, partition_by: Vec<Ident>) -> Result<(), PlanError> {
185 for (idx, ((desc_name, desc_type), partition_name)) in
186 desc.iter().zip(partition_by.into_iter()).enumerate()
187 {
188 let partition_name = normalize::column_name(partition_name);
189 if *desc_name != partition_name {
190 sql_bail!(
191 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
192 );
193 }
194 if !preserves_order(&desc_type.scalar_type) {
195 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
196 }
197 }
198 Ok(())
199}
200
201pub fn describe_create_database(
202 _: &StatementContext,
203 _: CreateDatabaseStatement,
204) -> Result<StatementDesc, PlanError> {
205 Ok(StatementDesc::new(None))
206}
207
208pub fn plan_create_database(
209 _: &StatementContext,
210 CreateDatabaseStatement {
211 name,
212 if_not_exists,
213 }: CreateDatabaseStatement,
214) -> Result<Plan, PlanError> {
215 Ok(Plan::CreateDatabase(CreateDatabasePlan {
216 name: normalize::ident(name.0),
217 if_not_exists,
218 }))
219}
220
221pub fn describe_create_schema(
222 _: &StatementContext,
223 _: CreateSchemaStatement,
224) -> Result<StatementDesc, PlanError> {
225 Ok(StatementDesc::new(None))
226}
227
228pub fn plan_create_schema(
229 scx: &StatementContext,
230 CreateSchemaStatement {
231 mut name,
232 if_not_exists,
233 }: CreateSchemaStatement,
234) -> Result<Plan, PlanError> {
235 if name.0.len() > 2 {
236 sql_bail!("schema name {} has more than two components", name);
237 }
238 let schema_name = normalize::ident(
239 name.0
240 .pop()
241 .expect("names always have at least one component"),
242 );
243 let database_spec = match name.0.pop() {
244 None => match scx.catalog.active_database() {
245 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
246 None => sql_bail!("no database specified and no active database"),
247 },
248 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
249 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
250 Err(_) => sql_bail!("invalid database {}", n.as_str()),
251 },
252 };
253 Ok(Plan::CreateSchema(CreateSchemaPlan {
254 database_spec,
255 schema_name,
256 if_not_exists,
257 }))
258}
259
260pub fn describe_create_table(
261 _: &StatementContext,
262 _: CreateTableStatement<Aug>,
263) -> Result<StatementDesc, PlanError> {
264 Ok(StatementDesc::new(None))
265}
266
267pub fn plan_create_table(
268 scx: &StatementContext,
269 stmt: CreateTableStatement<Aug>,
270) -> Result<Plan, PlanError> {
271 let CreateTableStatement {
272 name,
273 columns,
274 constraints,
275 if_not_exists,
276 temporary,
277 with_options,
278 } = &stmt;
279
280 let names: Vec<_> = columns
281 .iter()
282 .filter(|c| {
283 let is_versioned = c
287 .options
288 .iter()
289 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
290 !is_versioned
291 })
292 .map(|c| normalize::column_name(c.name.clone()))
293 .collect();
294
295 if let Some(dup) = names.iter().duplicates().next() {
296 sql_bail!("column {} specified more than once", dup.as_str().quoted());
297 }
298
299 let mut column_types = Vec::with_capacity(columns.len());
302 let mut defaults = Vec::with_capacity(columns.len());
303 let mut changes = BTreeMap::new();
304 let mut keys = Vec::new();
305
306 for (i, c) in columns.into_iter().enumerate() {
307 let aug_data_type = &c.data_type;
308 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
309 let mut nullable = true;
310 let mut default = Expr::null();
311 let mut versioned = false;
312 for option in &c.options {
313 match &option.option {
314 ColumnOption::NotNull => nullable = false,
315 ColumnOption::Default(expr) => {
316 let mut expr = expr.clone();
319 transform_ast::transform(scx, &mut expr)?;
320 let _ = query::plan_default_expr(scx, &expr, &ty)?;
321 default = expr.clone();
322 }
323 ColumnOption::Unique { is_primary } => {
324 keys.push(vec![i]);
325 if *is_primary {
326 nullable = false;
327 }
328 }
329 ColumnOption::Versioned { action, version } => {
330 let version = RelationVersion::from(*version);
331 versioned = true;
332
333 let name = normalize::column_name(c.name.clone());
334 let typ = ty.clone().nullable(nullable);
335
336 changes.insert(version, (action.clone(), name, typ));
337 }
338 other => {
339 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
340 }
341 }
342 }
343 if !versioned {
346 column_types.push(ty.nullable(nullable));
347 }
348 defaults.push(default);
349 }
350
351 let mut seen_primary = false;
352 'c: for constraint in constraints {
353 match constraint {
354 TableConstraint::Unique {
355 name: _,
356 columns,
357 is_primary,
358 nulls_not_distinct,
359 } => {
360 if seen_primary && *is_primary {
361 sql_bail!(
362 "multiple primary keys for table {} are not allowed",
363 name.to_ast_string_stable()
364 );
365 }
366 seen_primary = *is_primary || seen_primary;
367
368 let mut key = vec![];
369 for column in columns {
370 let column = normalize::column_name(column.clone());
371 match names.iter().position(|name| *name == column) {
372 None => sql_bail!("unknown column in constraint: {}", column),
373 Some(i) => {
374 let nullable = &mut column_types[i].nullable;
375 if *is_primary {
376 if *nulls_not_distinct {
377 sql_bail!(
378 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
379 );
380 }
381
382 *nullable = false;
383 } else if !(*nulls_not_distinct || !*nullable) {
384 break 'c;
387 }
388
389 key.push(i);
390 }
391 }
392 }
393
394 if *is_primary {
395 keys.insert(0, key);
396 } else {
397 keys.push(key);
398 }
399 }
400 TableConstraint::ForeignKey { .. } => {
401 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
404 }
405 TableConstraint::Check { .. } => {
406 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
409 }
410 }
411 }
412
413 if !keys.is_empty() {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
417 }
418
419 let typ = RelationType::new(column_types).with_keys(keys);
420
421 let temporary = *temporary;
422 let name = if temporary {
423 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
424 } else {
425 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
426 };
427
428 let full_name = scx.catalog.resolve_full_name(&name);
430 let partial_name = PartialItemName::from(full_name.clone());
431 if let (false, Ok(item)) = (
434 if_not_exists,
435 scx.catalog.resolve_item_or_type(&partial_name),
436 ) {
437 return Err(PlanError::ItemAlreadyExists {
438 name: full_name.to_string(),
439 item_type: item.item_type(),
440 });
441 }
442
443 let desc = RelationDesc::new(typ, names);
444 let mut desc = VersionedRelationDesc::new(desc);
445 for (version, (_action, name, typ)) in changes.into_iter() {
446 let new_version = desc.add_column(name, typ);
447 if version != new_version {
448 return Err(PlanError::InvalidTable {
449 name: full_name.item,
450 });
451 }
452 }
453
454 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
455
456 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
462 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
463
464 let compaction_window = options.iter().find_map(|o| {
465 #[allow(irrefutable_let_patterns)]
466 if let crate::plan::TableOption::RetainHistory(lcw) = o {
467 Some(lcw.clone())
468 } else {
469 None
470 }
471 });
472
473 let table = Table {
474 create_sql,
475 desc,
476 temporary,
477 compaction_window,
478 data_source: TableDataSource::TableWrites { defaults },
479 };
480 Ok(Plan::CreateTable(CreateTablePlan {
481 name,
482 table,
483 if_not_exists: *if_not_exists,
484 }))
485}
486
487pub fn describe_create_table_from_source(
488 _: &StatementContext,
489 _: CreateTableFromSourceStatement<Aug>,
490) -> Result<StatementDesc, PlanError> {
491 Ok(StatementDesc::new(None))
492}
493
494pub fn describe_create_webhook_source(
495 _: &StatementContext,
496 _: CreateWebhookSourceStatement<Aug>,
497) -> Result<StatementDesc, PlanError> {
498 Ok(StatementDesc::new(None))
499}
500
501pub fn describe_create_source(
502 _: &StatementContext,
503 _: CreateSourceStatement<Aug>,
504) -> Result<StatementDesc, PlanError> {
505 Ok(StatementDesc::new(None))
506}
507
508pub fn describe_create_subsource(
509 _: &StatementContext,
510 _: CreateSubsourceStatement<Aug>,
511) -> Result<StatementDesc, PlanError> {
512 Ok(StatementDesc::new(None))
513}
514
515generate_extracted_config!(
516 CreateSourceOption,
517 (TimestampInterval, Duration),
518 (RetainHistory, OptionalDuration)
519);
520
521generate_extracted_config!(
522 PgConfigOption,
523 (Details, String),
524 (Publication, String),
525 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
526);
527
528generate_extracted_config!(
529 MySqlConfigOption,
530 (Details, String),
531 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
532 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
533);
534
535generate_extracted_config!(
536 SqlServerConfigOption,
537 (Details, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542pub fn plan_create_webhook_source(
543 scx: &StatementContext,
544 mut stmt: CreateWebhookSourceStatement<Aug>,
545) -> Result<Plan, PlanError> {
546 if stmt.is_table {
547 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
548 }
549
550 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
553 if in_cluster.replica_ids().len() > 1 {
554 sql_bail!("cannot create webhook source in cluster with more than one replica")
555 }
556 let create_sql =
557 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
558
559 let CreateWebhookSourceStatement {
560 name,
561 if_not_exists,
562 body_format,
563 include_headers,
564 validate_using,
565 is_table,
566 in_cluster: _,
568 } = stmt;
569
570 let validate_using = validate_using
571 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
572 .transpose()?;
573 if let Some(WebhookValidation { expression, .. }) = &validate_using {
574 if !expression.contains_column() {
577 return Err(PlanError::WebhookValidationDoesNotUseColumns);
578 }
579 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
583 return Err(PlanError::WebhookValidationNonDeterministic);
584 }
585 }
586
587 let body_format = match body_format {
588 Format::Bytes => WebhookBodyFormat::Bytes,
589 Format::Json { array } => WebhookBodyFormat::Json { array },
590 Format::Text => WebhookBodyFormat::Text,
591 ty => {
593 return Err(PlanError::Unsupported {
594 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
595 discussion_no: None,
596 });
597 }
598 };
599
600 let mut column_ty = vec![
601 ColumnType {
603 scalar_type: ScalarType::from(body_format),
604 nullable: false,
605 },
606 ];
607 let mut column_names = vec!["body".to_string()];
608
609 let mut headers = WebhookHeaders::default();
610
611 if let Some(filters) = include_headers.column {
613 column_ty.push(ColumnType {
614 scalar_type: ScalarType::Map {
615 value_type: Box::new(ScalarType::String),
616 custom_id: None,
617 },
618 nullable: false,
619 });
620 column_names.push("headers".to_string());
621
622 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
623 filters.into_iter().partition_map(|filter| {
624 if filter.block {
625 itertools::Either::Right(filter.header_name)
626 } else {
627 itertools::Either::Left(filter.header_name)
628 }
629 });
630 headers.header_column = Some(WebhookHeaderFilters { allow, block });
631 }
632
633 for header in include_headers.mappings {
635 let scalar_type = header
636 .use_bytes
637 .then_some(ScalarType::Bytes)
638 .unwrap_or(ScalarType::String);
639 column_ty.push(ColumnType {
640 scalar_type,
641 nullable: true,
642 });
643 column_names.push(header.column_name.into_string());
644
645 let column_idx = column_ty.len() - 1;
646 assert_eq!(
648 column_idx,
649 column_names.len() - 1,
650 "header column names and types don't match"
651 );
652 headers
653 .mapped_headers
654 .insert(column_idx, (header.header_name, header.use_bytes));
655 }
656
657 let mut unique_check = HashSet::with_capacity(column_names.len());
659 for name in &column_names {
660 if !unique_check.insert(name) {
661 return Err(PlanError::AmbiguousColumn(name.clone().into()));
662 }
663 }
664 if column_names.len() > MAX_NUM_COLUMNS {
665 return Err(PlanError::TooManyColumns {
666 max_num_columns: MAX_NUM_COLUMNS,
667 req_num_columns: column_names.len(),
668 });
669 }
670
671 let typ = RelationType::new(column_ty);
672 let desc = RelationDesc::new(typ, column_names);
673
674 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
676 let full_name = scx.catalog.resolve_full_name(&name);
677 let partial_name = PartialItemName::from(full_name.clone());
678 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
679 return Err(PlanError::ItemAlreadyExists {
680 name: full_name.to_string(),
681 item_type: item.item_type(),
682 });
683 }
684
685 let timeline = Timeline::EpochMilliseconds;
688
689 let plan = if is_table {
690 let data_source = DataSourceDesc::Webhook {
691 validate_using,
692 body_format,
693 headers,
694 cluster_id: Some(in_cluster.id()),
695 };
696 let data_source = TableDataSource::DataSource {
697 desc: data_source,
698 timeline,
699 };
700 Plan::CreateTable(CreateTablePlan {
701 name,
702 if_not_exists,
703 table: Table {
704 create_sql,
705 desc: VersionedRelationDesc::new(desc),
706 temporary: false,
707 compaction_window: None,
708 data_source,
709 },
710 })
711 } else {
712 let data_source = DataSourceDesc::Webhook {
713 validate_using,
714 body_format,
715 headers,
716 cluster_id: None,
718 };
719 Plan::CreateSource(CreateSourcePlan {
720 name,
721 source: Source {
722 create_sql,
723 data_source,
724 desc,
725 compaction_window: None,
726 },
727 if_not_exists,
728 timeline,
729 in_cluster: Some(in_cluster.id()),
730 })
731 };
732
733 Ok(plan)
734}
735
736pub fn plan_create_source(
737 scx: &StatementContext,
738 mut stmt: CreateSourceStatement<Aug>,
739) -> Result<Plan, PlanError> {
740 let CreateSourceStatement {
741 name,
742 in_cluster: _,
743 col_names,
744 connection: source_connection,
745 envelope,
746 if_not_exists,
747 format,
748 key_constraint,
749 include_metadata,
750 with_options,
751 external_references: referenced_subsources,
752 progress_subsource,
753 } = &stmt;
754
755 mz_ore::soft_assert_or_log!(
756 referenced_subsources.is_none(),
757 "referenced subsources must be cleared in purification"
758 );
759
760 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
761 && scx.catalog.system_vars().force_source_table_syntax();
762
763 if force_source_table_syntax {
766 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
767 Err(PlanError::UseTablesForSources(
768 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
769 ))?;
770 }
771 }
772
773 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
774
775 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
776 && include_metadata
777 .iter()
778 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
779 {
780 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
782 }
783 if !matches!(
784 source_connection,
785 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
786 ) && !include_metadata.is_empty()
787 {
788 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
789 }
790
791 let external_connection = match source_connection {
792 CreateSourceConnection::Kafka {
793 connection: connection_name,
794 options,
795 } => {
796 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
797 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
798 sql_bail!(
799 "{} is not a kafka connection",
800 scx.catalog.resolve_full_name(connection_item.name())
801 )
802 }
803
804 let KafkaSourceConfigOptionExtracted {
805 group_id_prefix,
806 topic,
807 topic_metadata_refresh_interval,
808 start_timestamp: _, start_offset,
810 seen: _,
811 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
812
813 let topic = topic.expect("validated exists during purification");
814
815 let mut start_offsets = BTreeMap::new();
816 if let Some(offsets) = start_offset {
817 for (part, offset) in offsets.iter().enumerate() {
818 if *offset < 0 {
819 sql_bail!("START OFFSET must be a nonnegative integer");
820 }
821 start_offsets.insert(i32::try_from(part)?, *offset);
822 }
823 }
824
825 if !start_offsets.is_empty() && envelope.requires_all_input() {
826 sql_bail!("START OFFSET is not supported with ENVELOPE {}", envelope)
827 }
828
829 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
830 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
833 }
834
835 if !include_metadata.is_empty()
836 && !matches!(
837 envelope,
838 ast::SourceEnvelope::Upsert { .. }
839 | ast::SourceEnvelope::None
840 | ast::SourceEnvelope::Debezium
841 )
842 {
843 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
845 }
846
847 let metadata_columns = include_metadata
851 .into_iter()
852 .flat_map(|item| match item {
853 SourceIncludeMetadata::Timestamp { alias } => {
854 let name = match alias {
855 Some(name) => name.to_string(),
856 None => "timestamp".to_owned(),
857 };
858 Some((name, KafkaMetadataKind::Timestamp))
859 }
860 SourceIncludeMetadata::Partition { alias } => {
861 let name = match alias {
862 Some(name) => name.to_string(),
863 None => "partition".to_owned(),
864 };
865 Some((name, KafkaMetadataKind::Partition))
866 }
867 SourceIncludeMetadata::Offset { alias } => {
868 let name = match alias {
869 Some(name) => name.to_string(),
870 None => "offset".to_owned(),
871 };
872 Some((name, KafkaMetadataKind::Offset))
873 }
874 SourceIncludeMetadata::Headers { alias } => {
875 let name = match alias {
876 Some(name) => name.to_string(),
877 None => "headers".to_owned(),
878 };
879 Some((name, KafkaMetadataKind::Headers))
880 }
881 SourceIncludeMetadata::Header {
882 alias,
883 key,
884 use_bytes,
885 } => Some((
886 alias.to_string(),
887 KafkaMetadataKind::Header {
888 key: key.clone(),
889 use_bytes: *use_bytes,
890 },
891 )),
892 SourceIncludeMetadata::Key { .. } => {
893 None
895 }
896 })
897 .collect();
898
899 let connection = KafkaSourceConnection::<ReferencedConnection> {
900 connection: connection_item.id(),
901 connection_id: connection_item.id(),
902 topic,
903 start_offsets,
904 group_id_prefix,
905 topic_metadata_refresh_interval,
906 metadata_columns,
907 };
908
909 GenericSourceConnection::Kafka(connection)
910 }
911 CreateSourceConnection::Postgres {
912 connection,
913 options,
914 }
915 | CreateSourceConnection::Yugabyte {
916 connection,
917 options,
918 } => {
919 let (source_flavor, flavor_name) = match source_connection {
920 CreateSourceConnection::Postgres { .. } => (PostgresFlavor::Vanilla, "PostgreSQL"),
921 CreateSourceConnection::Yugabyte { .. } => (PostgresFlavor::Yugabyte, "Yugabyte"),
922 _ => unreachable!(),
923 };
924
925 let connection_item = scx.get_item_by_resolved_name(connection)?;
926 let connection_mismatch = match connection_item.connection()? {
927 Connection::Postgres(conn) => source_flavor != conn.flavor,
928 _ => true,
929 };
930 if connection_mismatch {
931 sql_bail!(
932 "{} is not a {flavor_name} connection",
933 scx.catalog.resolve_full_name(connection_item.name())
934 )
935 }
936
937 let PgConfigOptionExtracted {
938 details,
939 publication,
940 text_columns: _,
944 seen: _,
945 } = options.clone().try_into()?;
946
947 let details = details
948 .as_ref()
949 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
950 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
951 let details = ProtoPostgresSourcePublicationDetails::decode(&*details)
952 .map_err(|e| sql_err!("{}", e))?;
953
954 let publication_details = PostgresSourcePublicationDetails::from_proto(details)
955 .map_err(|e| sql_err!("{}", e))?;
956
957 let connection =
958 GenericSourceConnection::<ReferencedConnection>::from(PostgresSourceConnection {
959 connection: connection_item.id(),
960 connection_id: connection_item.id(),
961 publication: publication.expect("validated exists during purification"),
962 publication_details,
963 });
964
965 connection
966 }
967 CreateSourceConnection::SqlServer {
968 connection,
969 options: _,
970 } => {
971 let connection_item = scx.get_item_by_resolved_name(connection)?;
972 match connection_item.connection()? {
973 Connection::SqlServer(connection) => connection,
974 _ => sql_bail!(
975 "{} is not a SQL Server connection",
976 scx.catalog.resolve_full_name(connection_item.name())
977 ),
978 };
979 let extras = SqlServerSourceExtras {};
982 let connection =
983 GenericSourceConnection::<ReferencedConnection>::from(SqlServerSource {
984 catalog_id: connection_item.id(),
985 connection: connection_item.id(),
986 extras,
987 });
988
989 connection
990 }
991 CreateSourceConnection::MySql {
992 connection,
993 options,
994 } => {
995 let connection_item = scx.get_item_by_resolved_name(connection)?;
996 match connection_item.connection()? {
997 Connection::MySql(connection) => connection,
998 _ => sql_bail!(
999 "{} is not a MySQL connection",
1000 scx.catalog.resolve_full_name(connection_item.name())
1001 ),
1002 };
1003 let MySqlConfigOptionExtracted {
1004 details,
1005 text_columns: _,
1009 exclude_columns: _,
1010 seen: _,
1011 } = options.clone().try_into()?;
1012
1013 let details = details
1014 .as_ref()
1015 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1016 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1017 let details =
1018 ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1019 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1020
1021 let connection =
1022 GenericSourceConnection::<ReferencedConnection>::from(MySqlSourceConnection {
1023 connection: connection_item.id(),
1024 connection_id: connection_item.id(),
1025 details,
1026 });
1027
1028 connection
1029 }
1030 CreateSourceConnection::LoadGenerator { generator, options } => {
1031 let load_generator =
1032 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1033
1034 let LoadGeneratorOptionExtracted {
1035 tick_interval,
1036 as_of,
1037 up_to,
1038 ..
1039 } = options.clone().try_into()?;
1040 let tick_micros = match tick_interval {
1041 Some(interval) => Some(interval.as_micros().try_into()?),
1042 None => None,
1043 };
1044
1045 if up_to < as_of {
1046 sql_bail!("UP TO cannot be less than AS OF");
1047 }
1048
1049 let connection = GenericSourceConnection::from(LoadGeneratorSourceConnection {
1050 load_generator,
1051 tick_micros,
1052 as_of,
1053 up_to,
1054 });
1055
1056 connection
1057 }
1058 };
1059
1060 let CreateSourceOptionExtracted {
1061 timestamp_interval,
1062 retain_history,
1063 seen: _,
1064 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
1065
1066 let metadata_columns_desc = match external_connection {
1067 GenericSourceConnection::Kafka(KafkaSourceConnection {
1068 ref metadata_columns,
1069 ..
1070 }) => kafka_metadata_columns_desc(metadata_columns),
1071 _ => vec![],
1072 };
1073
1074 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1076 scx,
1077 &envelope,
1078 format,
1079 Some(external_connection.default_key_desc()),
1080 external_connection.default_value_desc(),
1081 include_metadata,
1082 metadata_columns_desc,
1083 &external_connection,
1084 )?;
1085 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
1086
1087 let names: Vec<_> = desc.iter_names().cloned().collect();
1088 if let Some(dup) = names.iter().duplicates().next() {
1089 sql_bail!("column {} specified more than once", dup.as_str().quoted());
1090 }
1091
1092 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
1094 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
1097
1098 let key_columns = columns
1099 .into_iter()
1100 .map(normalize::column_name)
1101 .collect::<Vec<_>>();
1102
1103 let mut uniq = BTreeSet::new();
1104 for col in key_columns.iter() {
1105 if !uniq.insert(col) {
1106 sql_bail!("Repeated column name in source key constraint: {}", col);
1107 }
1108 }
1109
1110 let key_indices = key_columns
1111 .iter()
1112 .map(|col| {
1113 let name_idx = desc
1114 .get_by_name(col)
1115 .map(|(idx, _type)| idx)
1116 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
1117 if desc.get_unambiguous_name(name_idx).is_none() {
1118 sql_bail!("Ambiguous column in source key constraint: {}", col);
1119 }
1120 Ok(name_idx)
1121 })
1122 .collect::<Result<Vec<_>, _>>()?;
1123
1124 if !desc.typ().keys.is_empty() {
1125 return Err(key_constraint_err(&desc, &key_columns));
1126 } else {
1127 desc = desc.with_key(key_indices);
1128 }
1129 }
1130
1131 let timestamp_interval = match timestamp_interval {
1132 Some(duration) => {
1133 let min = scx.catalog.system_vars().min_timestamp_interval();
1134 let max = scx.catalog.system_vars().max_timestamp_interval();
1135 if duration < min || duration > max {
1136 return Err(PlanError::InvalidTimestampInterval {
1137 min,
1138 max,
1139 requested: duration,
1140 });
1141 }
1142 duration
1143 }
1144 None => scx.catalog.config().timestamp_interval,
1145 };
1146
1147 let (primary_export, primary_export_details) = if force_source_table_syntax {
1148 (
1149 SourceExportDataConfig {
1150 encoding: None,
1151 envelope: SourceEnvelope::None(NoneEnvelope {
1152 key_envelope: KeyEnvelope::None,
1153 key_arity: 0,
1154 }),
1155 },
1156 SourceExportDetails::None,
1157 )
1158 } else {
1159 (
1160 SourceExportDataConfig {
1161 encoding,
1162 envelope: envelope.clone(),
1163 },
1164 external_connection.primary_export_details(),
1165 )
1166 };
1167 let source_desc = SourceDesc::<ReferencedConnection> {
1168 connection: external_connection,
1169 primary_export,
1173 primary_export_details,
1174 timestamp_interval,
1175 };
1176
1177 let progress_subsource = match progress_subsource {
1178 Some(name) => match name {
1179 DeferredItemName::Named(name) => match name {
1180 ResolvedItemName::Item { id, .. } => *id,
1181 ResolvedItemName::Cte { .. }
1182 | ResolvedItemName::ContinualTask { .. }
1183 | ResolvedItemName::Error => {
1184 sql_bail!("[internal error] invalid target id")
1185 }
1186 },
1187 DeferredItemName::Deferred(_) => {
1188 sql_bail!("[internal error] progress subsource must be named during purification")
1189 }
1190 },
1191 _ => sql_bail!("[internal error] progress subsource must be named during purification"),
1192 };
1193
1194 let if_not_exists = *if_not_exists;
1195 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1196
1197 let full_name = scx.catalog.resolve_full_name(&name);
1199 let partial_name = PartialItemName::from(full_name.clone());
1200 if let (false, Ok(item)) = (
1203 if_not_exists,
1204 scx.catalog.resolve_item_or_type(&partial_name),
1205 ) {
1206 return Err(PlanError::ItemAlreadyExists {
1207 name: full_name.to_string(),
1208 item_type: item.item_type(),
1209 });
1210 }
1211
1212 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
1216
1217 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
1218
1219 let timeline = match envelope {
1221 SourceEnvelope::CdcV2 => {
1222 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1223 }
1224 _ => Timeline::EpochMilliseconds,
1225 };
1226
1227 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1228 let source = Source {
1229 create_sql,
1230 data_source: DataSourceDesc::Ingestion(Ingestion {
1231 desc: source_desc,
1232 progress_subsource,
1233 }),
1234 desc,
1235 compaction_window,
1236 };
1237
1238 Ok(Plan::CreateSource(CreateSourcePlan {
1239 name,
1240 source,
1241 if_not_exists,
1242 timeline,
1243 in_cluster: Some(in_cluster.id()),
1244 }))
1245}
1246
1247fn apply_source_envelope_encoding(
1248 scx: &StatementContext,
1249 envelope: &ast::SourceEnvelope,
1250 format: &Option<FormatSpecifier<Aug>>,
1251 key_desc: Option<RelationDesc>,
1252 value_desc: RelationDesc,
1253 include_metadata: &[SourceIncludeMetadata],
1254 metadata_columns_desc: Vec<(&str, ColumnType)>,
1255 source_connection: &GenericSourceConnection<ReferencedConnection>,
1256) -> Result<
1257 (
1258 RelationDesc,
1259 SourceEnvelope,
1260 Option<SourceDataEncoding<ReferencedConnection>>,
1261 ),
1262 PlanError,
1263> {
1264 let encoding = match format {
1265 Some(format) => Some(get_encoding(scx, format, envelope)?),
1266 None => None,
1267 };
1268
1269 let (key_desc, value_desc) = match &encoding {
1270 Some(encoding) => {
1271 match value_desc.typ().columns() {
1274 [typ] => match typ.scalar_type {
1275 ScalarType::Bytes => {}
1276 _ => sql_bail!(
1277 "The schema produced by the source is incompatible with format decoding"
1278 ),
1279 },
1280 _ => sql_bail!(
1281 "The schema produced by the source is incompatible with format decoding"
1282 ),
1283 }
1284
1285 let (key_desc, value_desc) = encoding.desc()?;
1286
1287 let key_desc = key_desc.map(|desc| {
1294 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1295 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1296 if is_kafka && is_envelope_none {
1297 RelationDesc::from_names_and_types(
1298 desc.into_iter()
1299 .map(|(name, typ)| (name, typ.nullable(true))),
1300 )
1301 } else {
1302 desc
1303 }
1304 });
1305 (key_desc, value_desc)
1306 }
1307 None => (key_desc, value_desc),
1308 };
1309
1310 let key_envelope_no_encoding = matches!(
1323 source_connection,
1324 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1325 load_generator: LoadGenerator::KeyValue(_),
1326 ..
1327 })
1328 );
1329 let mut key_envelope = get_key_envelope(
1330 include_metadata,
1331 encoding.as_ref(),
1332 key_envelope_no_encoding,
1333 )?;
1334
1335 match (&envelope, &key_envelope) {
1336 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1337 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1338 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1339 ),
1340 _ => {}
1341 };
1342
1343 let envelope = match &envelope {
1352 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1354 ast::SourceEnvelope::Debezium => {
1355 let after_idx = match typecheck_debezium(&value_desc) {
1357 Ok((_before_idx, after_idx)) => Ok(after_idx),
1358 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1359 Some(DataEncoding::Avro(_)) => Err(type_err),
1360 _ => Err(sql_err!(
1361 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1362 )),
1363 },
1364 }?;
1365
1366 UnplannedSourceEnvelope::Upsert {
1367 style: UpsertStyle::Debezium { after_idx },
1368 }
1369 }
1370 ast::SourceEnvelope::Upsert {
1371 value_decode_err_policy,
1372 } => {
1373 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1374 None => {
1375 if !key_envelope_no_encoding {
1376 bail_unsupported!(format!(
1377 "UPSERT requires a key/value format: {:?}",
1378 format
1379 ))
1380 }
1381 None
1382 }
1383 Some(key_encoding) => Some(key_encoding),
1384 };
1385 if key_envelope == KeyEnvelope::None {
1388 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1389 }
1390 let style = match value_decode_err_policy.as_slice() {
1392 [] => UpsertStyle::Default(key_envelope),
1393 [SourceErrorPolicy::Inline { alias }] => {
1394 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1395 UpsertStyle::ValueErrInline {
1396 key_envelope,
1397 error_column: alias
1398 .as_ref()
1399 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1400 }
1401 }
1402 _ => {
1403 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1404 }
1405 };
1406
1407 UnplannedSourceEnvelope::Upsert { style }
1408 }
1409 ast::SourceEnvelope::CdcV2 => {
1410 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1411 match format {
1413 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1414 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1415 }
1416 UnplannedSourceEnvelope::CdcV2
1417 }
1418 };
1419
1420 let metadata_desc = included_column_desc(metadata_columns_desc);
1421 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1422
1423 Ok((desc, envelope, encoding))
1424}
1425
1426fn plan_source_export_desc(
1429 scx: &StatementContext,
1430 name: &UnresolvedItemName,
1431 columns: &Vec<ColumnDef<Aug>>,
1432 constraints: &Vec<TableConstraint<Aug>>,
1433) -> Result<RelationDesc, PlanError> {
1434 let names: Vec<_> = columns
1435 .iter()
1436 .map(|c| normalize::column_name(c.name.clone()))
1437 .collect();
1438
1439 if let Some(dup) = names.iter().duplicates().next() {
1440 sql_bail!("column {} specified more than once", dup.as_str().quoted());
1441 }
1442
1443 let mut column_types = Vec::with_capacity(columns.len());
1446 let mut keys = Vec::new();
1447
1448 for (i, c) in columns.into_iter().enumerate() {
1449 let aug_data_type = &c.data_type;
1450 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1451 let mut nullable = true;
1452 for option in &c.options {
1453 match &option.option {
1454 ColumnOption::NotNull => nullable = false,
1455 ColumnOption::Default(_) => {
1456 bail_unsupported!("Source export with default value")
1457 }
1458 ColumnOption::Unique { is_primary } => {
1459 keys.push(vec![i]);
1460 if *is_primary {
1461 nullable = false;
1462 }
1463 }
1464 other => {
1465 bail_unsupported!(format!("Source export with column constraint: {}", other))
1466 }
1467 }
1468 }
1469 column_types.push(ty.nullable(nullable));
1470 }
1471
1472 let mut seen_primary = false;
1473 'c: for constraint in constraints {
1474 match constraint {
1475 TableConstraint::Unique {
1476 name: _,
1477 columns,
1478 is_primary,
1479 nulls_not_distinct,
1480 } => {
1481 if seen_primary && *is_primary {
1482 sql_bail!(
1483 "multiple primary keys for source export {} are not allowed",
1484 name.to_ast_string_stable()
1485 );
1486 }
1487 seen_primary = *is_primary || seen_primary;
1488
1489 let mut key = vec![];
1490 for column in columns {
1491 let column = normalize::column_name(column.clone());
1492 match names.iter().position(|name| *name == column) {
1493 None => sql_bail!("unknown column in constraint: {}", column),
1494 Some(i) => {
1495 let nullable = &mut column_types[i].nullable;
1496 if *is_primary {
1497 if *nulls_not_distinct {
1498 sql_bail!(
1499 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1500 );
1501 }
1502 *nullable = false;
1503 } else if !(*nulls_not_distinct || !*nullable) {
1504 break 'c;
1507 }
1508
1509 key.push(i);
1510 }
1511 }
1512 }
1513
1514 if *is_primary {
1515 keys.insert(0, key);
1516 } else {
1517 keys.push(key);
1518 }
1519 }
1520 TableConstraint::ForeignKey { .. } => {
1521 bail_unsupported!("Source export with a foreign key")
1522 }
1523 TableConstraint::Check { .. } => {
1524 bail_unsupported!("Source export with a check constraint")
1525 }
1526 }
1527 }
1528
1529 let typ = RelationType::new(column_types).with_keys(keys);
1530 let desc = RelationDesc::new(typ, names);
1531 Ok(desc)
1532}
1533
1534generate_extracted_config!(
1535 CreateSubsourceOption,
1536 (Progress, bool, Default(false)),
1537 (ExternalReference, UnresolvedItemName),
1538 (TextColumns, Vec::<Ident>, Default(vec![])),
1539 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1540 (Details, String)
1541);
1542
1543pub fn plan_create_subsource(
1544 scx: &StatementContext,
1545 stmt: CreateSubsourceStatement<Aug>,
1546) -> Result<Plan, PlanError> {
1547 let CreateSubsourceStatement {
1548 name,
1549 columns,
1550 of_source,
1551 constraints,
1552 if_not_exists,
1553 with_options,
1554 } = &stmt;
1555
1556 let CreateSubsourceOptionExtracted {
1557 progress,
1558 external_reference,
1559 text_columns,
1560 exclude_columns,
1561 details,
1562 seen: _,
1563 } = with_options.clone().try_into()?;
1564
1565 assert!(
1570 progress ^ (external_reference.is_some() && of_source.is_some()),
1571 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1572 );
1573
1574 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1575
1576 let data_source = if let Some(source_reference) = of_source {
1577 if scx.catalog.system_vars().enable_create_table_from_source()
1580 && scx.catalog.system_vars().force_source_table_syntax()
1581 {
1582 Err(PlanError::UseTablesForSources(
1583 "CREATE SUBSOURCE".to_string(),
1584 ))?;
1585 }
1586
1587 let ingestion_id = *source_reference.item_id();
1590 let external_reference = external_reference.unwrap();
1591
1592 let details = details
1595 .as_ref()
1596 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1597 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1598 let details =
1599 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1600 let details =
1601 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1602 let details = match details {
1603 SourceExportStatementDetails::Postgres { table } => {
1604 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1605 column_casts: crate::pure::postgres::generate_column_casts(
1606 scx,
1607 &table,
1608 &text_columns,
1609 )?,
1610 table,
1611 })
1612 }
1613 SourceExportStatementDetails::MySql {
1614 table,
1615 initial_gtid_set,
1616 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1617 table,
1618 initial_gtid_set,
1619 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1620 exclude_columns: exclude_columns
1621 .into_iter()
1622 .map(|c| c.into_string())
1623 .collect(),
1624 }),
1625 SourceExportStatementDetails::SqlServer {
1626 table,
1627 capture_instance,
1628 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1629 capture_instance,
1630 table,
1631 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1632 exclude_columns: exclude_columns
1633 .into_iter()
1634 .map(|c| c.into_string())
1635 .collect(),
1636 }),
1637 SourceExportStatementDetails::LoadGenerator { output } => {
1638 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1639 }
1640 SourceExportStatementDetails::Kafka {} => {
1641 bail_unsupported!("subsources cannot reference Kafka sources")
1642 }
1643 };
1644 DataSourceDesc::IngestionExport {
1645 ingestion_id,
1646 external_reference,
1647 details,
1648 data_config: SourceExportDataConfig {
1650 envelope: SourceEnvelope::None(NoneEnvelope {
1651 key_envelope: KeyEnvelope::None,
1652 key_arity: 0,
1653 }),
1654 encoding: None,
1655 },
1656 }
1657 } else if progress {
1658 DataSourceDesc::Progress
1659 } else {
1660 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1661 };
1662
1663 let if_not_exists = *if_not_exists;
1664 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1665
1666 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1667
1668 let source = Source {
1669 create_sql,
1670 data_source,
1671 desc,
1672 compaction_window: None,
1673 };
1674
1675 Ok(Plan::CreateSource(CreateSourcePlan {
1676 name,
1677 source,
1678 if_not_exists,
1679 timeline: Timeline::EpochMilliseconds,
1680 in_cluster: None,
1681 }))
1682}
1683
1684generate_extracted_config!(
1685 TableFromSourceOption,
1686 (TextColumns, Vec::<Ident>, Default(vec![])),
1687 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1688 (PartitionBy, Vec<Ident>),
1689 (Details, String)
1690);
1691
1692pub fn plan_create_table_from_source(
1693 scx: &StatementContext,
1694 stmt: CreateTableFromSourceStatement<Aug>,
1695) -> Result<Plan, PlanError> {
1696 if !scx.catalog.system_vars().enable_create_table_from_source() {
1697 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1698 }
1699
1700 let CreateTableFromSourceStatement {
1701 name,
1702 columns,
1703 constraints,
1704 if_not_exists,
1705 source,
1706 external_reference,
1707 envelope,
1708 format,
1709 include_metadata,
1710 with_options,
1711 } = &stmt;
1712
1713 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1714
1715 let TableFromSourceOptionExtracted {
1716 text_columns,
1717 exclude_columns,
1718 partition_by,
1719 details,
1720 seen: _,
1721 } = with_options.clone().try_into()?;
1722
1723 let source_item = scx.get_item_by_resolved_name(source)?;
1724 let ingestion_id = source_item.id();
1725
1726 let details = details
1729 .as_ref()
1730 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1731 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1732 let details =
1733 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1734 let details =
1735 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1736
1737 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1738 && include_metadata
1739 .iter()
1740 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1741 {
1742 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1744 }
1745 if !matches!(
1746 details,
1747 SourceExportStatementDetails::Kafka { .. }
1748 | SourceExportStatementDetails::LoadGenerator { .. }
1749 ) && !include_metadata.is_empty()
1750 {
1751 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1752 }
1753
1754 let details = match details {
1755 SourceExportStatementDetails::Postgres { table } => {
1756 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1757 column_casts: crate::pure::postgres::generate_column_casts(
1758 scx,
1759 &table,
1760 &text_columns,
1761 )?,
1762 table,
1763 })
1764 }
1765 SourceExportStatementDetails::MySql {
1766 table,
1767 initial_gtid_set,
1768 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1769 table,
1770 initial_gtid_set,
1771 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1772 exclude_columns: exclude_columns
1773 .into_iter()
1774 .map(|c| c.into_string())
1775 .collect(),
1776 }),
1777 SourceExportStatementDetails::SqlServer {
1778 table,
1779 capture_instance,
1780 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1781 table,
1782 capture_instance,
1783 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1784 exclude_columns: exclude_columns
1785 .into_iter()
1786 .map(|c| c.into_string())
1787 .collect(),
1788 }),
1789 SourceExportStatementDetails::LoadGenerator { output } => {
1790 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1791 }
1792 SourceExportStatementDetails::Kafka {} => {
1793 if !include_metadata.is_empty()
1794 && !matches!(
1795 envelope,
1796 ast::SourceEnvelope::Upsert { .. }
1797 | ast::SourceEnvelope::None
1798 | ast::SourceEnvelope::Debezium
1799 )
1800 {
1801 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1803 }
1804
1805 let metadata_columns = include_metadata
1806 .into_iter()
1807 .flat_map(|item| match item {
1808 SourceIncludeMetadata::Timestamp { alias } => {
1809 let name = match alias {
1810 Some(name) => name.to_string(),
1811 None => "timestamp".to_owned(),
1812 };
1813 Some((name, KafkaMetadataKind::Timestamp))
1814 }
1815 SourceIncludeMetadata::Partition { alias } => {
1816 let name = match alias {
1817 Some(name) => name.to_string(),
1818 None => "partition".to_owned(),
1819 };
1820 Some((name, KafkaMetadataKind::Partition))
1821 }
1822 SourceIncludeMetadata::Offset { alias } => {
1823 let name = match alias {
1824 Some(name) => name.to_string(),
1825 None => "offset".to_owned(),
1826 };
1827 Some((name, KafkaMetadataKind::Offset))
1828 }
1829 SourceIncludeMetadata::Headers { alias } => {
1830 let name = match alias {
1831 Some(name) => name.to_string(),
1832 None => "headers".to_owned(),
1833 };
1834 Some((name, KafkaMetadataKind::Headers))
1835 }
1836 SourceIncludeMetadata::Header {
1837 alias,
1838 key,
1839 use_bytes,
1840 } => Some((
1841 alias.to_string(),
1842 KafkaMetadataKind::Header {
1843 key: key.clone(),
1844 use_bytes: *use_bytes,
1845 },
1846 )),
1847 SourceIncludeMetadata::Key { .. } => {
1848 None
1850 }
1851 })
1852 .collect();
1853
1854 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1855 }
1856 };
1857
1858 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1859
1860 let (key_desc, value_desc) =
1865 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1866 let columns = match columns {
1867 TableFromSourceColumns::Defined(columns) => columns,
1868 _ => unreachable!(),
1869 };
1870 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1871 (None, desc)
1872 } else {
1873 let key_desc = source_connection.default_key_desc();
1874 let value_desc = source_connection.default_value_desc();
1875 (Some(key_desc), value_desc)
1876 };
1877
1878 let metadata_columns_desc = match &details {
1879 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1880 metadata_columns, ..
1881 }) => kafka_metadata_columns_desc(metadata_columns),
1882 _ => vec![],
1883 };
1884
1885 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1886 scx,
1887 &envelope,
1888 format,
1889 key_desc,
1890 value_desc,
1891 include_metadata,
1892 metadata_columns_desc,
1893 source_connection,
1894 )?;
1895 if let TableFromSourceColumns::Named(col_names) = columns {
1896 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1897 }
1898
1899 let names: Vec<_> = desc.iter_names().cloned().collect();
1900 if let Some(dup) = names.iter().duplicates().next() {
1901 sql_bail!("column {} specified more than once", dup.as_str().quoted());
1902 }
1903
1904 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1905
1906 let timeline = match envelope {
1909 SourceEnvelope::CdcV2 => {
1910 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1911 }
1912 _ => Timeline::EpochMilliseconds,
1913 };
1914
1915 if let Some(partition_by) = partition_by {
1916 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1917 check_partition_by(&desc, partition_by)?;
1918 }
1919
1920 let data_source = DataSourceDesc::IngestionExport {
1921 ingestion_id,
1922 external_reference: external_reference
1923 .as_ref()
1924 .expect("populated in purification")
1925 .clone(),
1926 details,
1927 data_config: SourceExportDataConfig { envelope, encoding },
1928 };
1929
1930 let if_not_exists = *if_not_exists;
1931
1932 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1933
1934 let table = Table {
1935 create_sql,
1936 desc: VersionedRelationDesc::new(desc),
1937 temporary: false,
1938 compaction_window: None,
1939 data_source: TableDataSource::DataSource {
1940 desc: data_source,
1941 timeline,
1942 },
1943 };
1944
1945 Ok(Plan::CreateTable(CreateTablePlan {
1946 name,
1947 table,
1948 if_not_exists,
1949 }))
1950}
1951
1952generate_extracted_config!(
1953 LoadGeneratorOption,
1954 (TickInterval, Duration),
1955 (AsOf, u64, Default(0_u64)),
1956 (UpTo, u64, Default(u64::MAX)),
1957 (ScaleFactor, f64),
1958 (MaxCardinality, u64),
1959 (Keys, u64),
1960 (SnapshotRounds, u64),
1961 (TransactionalSnapshot, bool),
1962 (ValueSize, u64),
1963 (Seed, u64),
1964 (Partitions, u64),
1965 (BatchSize, u64)
1966);
1967
1968impl LoadGeneratorOptionExtracted {
1969 pub(super) fn ensure_only_valid_options(
1970 &self,
1971 loadgen: &ast::LoadGenerator,
1972 ) -> Result<(), PlanError> {
1973 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
1974
1975 let mut options = self.seen.clone();
1976
1977 let permitted_options: &[_] = match loadgen {
1978 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
1979 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
1980 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
1981 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
1982 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
1983 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
1984 ast::LoadGenerator::KeyValue => &[
1985 TickInterval,
1986 Keys,
1987 SnapshotRounds,
1988 TransactionalSnapshot,
1989 ValueSize,
1990 Seed,
1991 Partitions,
1992 BatchSize,
1993 ],
1994 };
1995
1996 for o in permitted_options {
1997 options.remove(o);
1998 }
1999
2000 if !options.is_empty() {
2001 sql_bail!(
2002 "{} load generators do not support {} values",
2003 loadgen,
2004 options.iter().join(", ")
2005 )
2006 }
2007
2008 Ok(())
2009 }
2010}
2011
2012pub(crate) fn load_generator_ast_to_generator(
2013 scx: &StatementContext,
2014 loadgen: &ast::LoadGenerator,
2015 options: &[LoadGeneratorOption<Aug>],
2016 include_metadata: &[SourceIncludeMetadata],
2017) -> Result<LoadGenerator, PlanError> {
2018 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2019 extracted.ensure_only_valid_options(loadgen)?;
2020
2021 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2022 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2023 }
2024
2025 let load_generator = match loadgen {
2026 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2027 ast::LoadGenerator::Clock => {
2028 scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2029 LoadGenerator::Clock
2030 }
2031 ast::LoadGenerator::Counter => {
2032 let LoadGeneratorOptionExtracted {
2033 max_cardinality, ..
2034 } = extracted;
2035 LoadGenerator::Counter { max_cardinality }
2036 }
2037 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2038 ast::LoadGenerator::Datums => LoadGenerator::Datums,
2039 ast::LoadGenerator::Tpch => {
2040 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2041
2042 let sf: f64 = scale_factor.unwrap_or(0.01);
2044 if !sf.is_finite() || sf < 0.0 {
2045 sql_bail!("unsupported scale factor {sf}");
2046 }
2047
2048 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2049 let total = (sf * multiplier).floor();
2050 let mut i = i64::try_cast_from(total)
2051 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2052 if i < 1 {
2053 i = 1;
2054 }
2055 Ok(i)
2056 };
2057
2058 let count_supplier = f_to_i(10_000f64)?;
2061 let count_part = f_to_i(200_000f64)?;
2062 let count_customer = f_to_i(150_000f64)?;
2063 let count_orders = f_to_i(150_000f64 * 10f64)?;
2064 let count_clerk = f_to_i(1_000f64)?;
2065
2066 LoadGenerator::Tpch {
2067 count_supplier,
2068 count_part,
2069 count_customer,
2070 count_orders,
2071 count_clerk,
2072 }
2073 }
2074 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2075 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2076 let LoadGeneratorOptionExtracted {
2077 keys,
2078 snapshot_rounds,
2079 transactional_snapshot,
2080 value_size,
2081 tick_interval,
2082 seed,
2083 partitions,
2084 batch_size,
2085 ..
2086 } = extracted;
2087
2088 let mut include_offset = None;
2089 for im in include_metadata {
2090 match im {
2091 SourceIncludeMetadata::Offset { alias } => {
2092 include_offset = match alias {
2093 Some(alias) => Some(alias.to_string()),
2094 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2095 }
2096 }
2097 SourceIncludeMetadata::Key { .. } => continue,
2098
2099 _ => {
2100 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2101 }
2102 };
2103 }
2104
2105 let lgkv = KeyValueLoadGenerator {
2106 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2107 snapshot_rounds: snapshot_rounds
2108 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2109 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2111 value_size: value_size
2112 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2113 partitions: partitions
2114 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2115 tick_interval,
2116 batch_size: batch_size
2117 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2118 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2119 include_offset,
2120 };
2121
2122 if lgkv.keys == 0
2123 || lgkv.partitions == 0
2124 || lgkv.value_size == 0
2125 || lgkv.batch_size == 0
2126 {
2127 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2128 }
2129
2130 if lgkv.keys % lgkv.partitions != 0 {
2131 sql_bail!("KEYS must be a multiple of PARTITIONS")
2132 }
2133
2134 if lgkv.batch_size > lgkv.keys {
2135 sql_bail!("KEYS must be larger than BATCH SIZE")
2136 }
2137
2138 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2141 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2142 }
2143
2144 if lgkv.snapshot_rounds == 0 {
2145 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2146 }
2147
2148 LoadGenerator::KeyValue(lgkv)
2149 }
2150 };
2151
2152 Ok(load_generator)
2153}
2154
2155fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2156 let before = value_desc.get_by_name(&"before".into());
2157 let (after_idx, after_ty) = value_desc
2158 .get_by_name(&"after".into())
2159 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2160 let before_idx = if let Some((before_idx, before_ty)) = before {
2161 if !matches!(before_ty.scalar_type, ScalarType::Record { .. }) {
2162 sql_bail!("'before' column must be of type record");
2163 }
2164 if before_ty != after_ty {
2165 sql_bail!("'before' type differs from 'after' column");
2166 }
2167 Some(before_idx)
2168 } else {
2169 None
2170 };
2171 Ok((before_idx, after_idx))
2172}
2173
2174fn get_encoding(
2175 scx: &StatementContext,
2176 format: &FormatSpecifier<Aug>,
2177 envelope: &ast::SourceEnvelope,
2178) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2179 let encoding = match format {
2180 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2181 FormatSpecifier::KeyValue { key, value } => {
2182 let key = {
2183 let encoding = get_encoding_inner(scx, key)?;
2184 Some(encoding.key.unwrap_or(encoding.value))
2185 };
2186 let value = get_encoding_inner(scx, value)?.value;
2187 SourceDataEncoding { key, value }
2188 }
2189 };
2190
2191 let requires_keyvalue = matches!(
2192 envelope,
2193 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2194 );
2195 let is_keyvalue = encoding.key.is_some();
2196 if requires_keyvalue && !is_keyvalue {
2197 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2198 };
2199
2200 Ok(encoding)
2201}
2202
2203fn source_sink_cluster_config<'a, 'ctx>(
2209 scx: &'a StatementContext<'ctx>,
2210 in_cluster: &mut Option<ResolvedClusterName>,
2211) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2212 let cluster = match in_cluster {
2213 None => {
2214 let cluster = scx.catalog.resolve_cluster(None)?;
2215 *in_cluster = Some(ResolvedClusterName {
2216 id: cluster.id(),
2217 print_name: None,
2218 });
2219 cluster
2220 }
2221 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2222 };
2223
2224 Ok(cluster)
2225}
2226
2227generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2228
2229#[derive(Debug)]
2230pub struct Schema {
2231 pub key_schema: Option<String>,
2232 pub value_schema: String,
2233 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2234 pub confluent_wire_format: bool,
2235}
2236
2237fn get_encoding_inner(
2238 scx: &StatementContext,
2239 format: &Format<Aug>,
2240) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2241 let value = match format {
2242 Format::Bytes => DataEncoding::Bytes,
2243 Format::Avro(schema) => {
2244 let Schema {
2245 key_schema,
2246 value_schema,
2247 csr_connection,
2248 confluent_wire_format,
2249 } = match schema {
2250 AvroSchema::InlineSchema {
2253 schema: ast::Schema { schema },
2254 with_options,
2255 } => {
2256 let AvroSchemaOptionExtracted {
2257 confluent_wire_format,
2258 ..
2259 } = with_options.clone().try_into()?;
2260
2261 Schema {
2262 key_schema: None,
2263 value_schema: schema.clone(),
2264 csr_connection: None,
2265 confluent_wire_format,
2266 }
2267 }
2268 AvroSchema::Csr {
2269 csr_connection:
2270 CsrConnectionAvro {
2271 connection,
2272 seed,
2273 key_strategy: _,
2274 value_strategy: _,
2275 },
2276 } => {
2277 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2278 let csr_connection = match item.connection()? {
2279 Connection::Csr(_) => item.id(),
2280 _ => {
2281 sql_bail!(
2282 "{} is not a schema registry connection",
2283 scx.catalog
2284 .resolve_full_name(item.name())
2285 .to_string()
2286 .quoted()
2287 )
2288 }
2289 };
2290
2291 if let Some(seed) = seed {
2292 Schema {
2293 key_schema: seed.key_schema.clone(),
2294 value_schema: seed.value_schema.clone(),
2295 csr_connection: Some(csr_connection),
2296 confluent_wire_format: true,
2297 }
2298 } else {
2299 unreachable!("CSR seed resolution should already have been called: Avro")
2300 }
2301 }
2302 };
2303
2304 if let Some(key_schema) = key_schema {
2305 return Ok(SourceDataEncoding {
2306 key: Some(DataEncoding::Avro(AvroEncoding {
2307 schema: key_schema,
2308 csr_connection: csr_connection.clone(),
2309 confluent_wire_format,
2310 })),
2311 value: DataEncoding::Avro(AvroEncoding {
2312 schema: value_schema,
2313 csr_connection,
2314 confluent_wire_format,
2315 }),
2316 });
2317 } else {
2318 DataEncoding::Avro(AvroEncoding {
2319 schema: value_schema,
2320 csr_connection,
2321 confluent_wire_format,
2322 })
2323 }
2324 }
2325 Format::Protobuf(schema) => match schema {
2326 ProtobufSchema::Csr {
2327 csr_connection:
2328 CsrConnectionProtobuf {
2329 connection:
2330 CsrConnection {
2331 connection,
2332 options,
2333 },
2334 seed,
2335 },
2336 } => {
2337 if let Some(CsrSeedProtobuf { key, value }) = seed {
2338 let item = scx.get_item_by_resolved_name(connection)?;
2339 let _ = match item.connection()? {
2340 Connection::Csr(connection) => connection,
2341 _ => {
2342 sql_bail!(
2343 "{} is not a schema registry connection",
2344 scx.catalog
2345 .resolve_full_name(item.name())
2346 .to_string()
2347 .quoted()
2348 )
2349 }
2350 };
2351
2352 if !options.is_empty() {
2353 sql_bail!("Protobuf CSR connections do not support any options");
2354 }
2355
2356 let value = DataEncoding::Protobuf(ProtobufEncoding {
2357 descriptors: strconv::parse_bytes(&value.schema)?,
2358 message_name: value.message_name.clone(),
2359 confluent_wire_format: true,
2360 });
2361 if let Some(key) = key {
2362 return Ok(SourceDataEncoding {
2363 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2364 descriptors: strconv::parse_bytes(&key.schema)?,
2365 message_name: key.message_name.clone(),
2366 confluent_wire_format: true,
2367 })),
2368 value,
2369 });
2370 }
2371 value
2372 } else {
2373 unreachable!("CSR seed resolution should already have been called: Proto")
2374 }
2375 }
2376 ProtobufSchema::InlineSchema {
2377 message_name,
2378 schema: ast::Schema { schema },
2379 } => {
2380 let descriptors = strconv::parse_bytes(schema)?;
2381
2382 DataEncoding::Protobuf(ProtobufEncoding {
2383 descriptors,
2384 message_name: message_name.to_owned(),
2385 confluent_wire_format: false,
2386 })
2387 }
2388 },
2389 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2390 regex: mz_repr::adt::regex::Regex::new(regex, false)
2391 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2392 }),
2393 Format::Csv { columns, delimiter } => {
2394 let columns = match columns {
2395 CsvColumns::Header { names } => {
2396 if names.is_empty() {
2397 sql_bail!("[internal error] column spec should get names in purify")
2398 }
2399 ColumnSpec::Header {
2400 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2401 }
2402 }
2403 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2404 };
2405 DataEncoding::Csv(CsvEncoding {
2406 columns,
2407 delimiter: u8::try_from(*delimiter)
2408 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2409 })
2410 }
2411 Format::Json { array: false } => DataEncoding::Json,
2412 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2413 Format::Text => DataEncoding::Text,
2414 };
2415 Ok(SourceDataEncoding { key: None, value })
2416}
2417
2418fn get_key_envelope(
2420 included_items: &[SourceIncludeMetadata],
2421 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2422 key_envelope_no_encoding: bool,
2423) -> Result<KeyEnvelope, PlanError> {
2424 let key_definition = included_items
2425 .iter()
2426 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2427 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2428 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2429 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2430 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2431 (Some(name), _) if key_envelope_no_encoding => {
2432 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2433 }
2434 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2435 (_, None) => {
2436 sql_bail!(
2440 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2441 got bare FORMAT"
2442 );
2443 }
2444 }
2445 } else {
2446 Ok(KeyEnvelope::None)
2447 }
2448}
2449
2450fn get_unnamed_key_envelope(
2453 key: Option<&DataEncoding<ReferencedConnection>>,
2454) -> Result<KeyEnvelope, PlanError> {
2455 let is_composite = match key {
2459 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2460 Some(
2461 DataEncoding::Avro(_)
2462 | DataEncoding::Csv(_)
2463 | DataEncoding::Protobuf(_)
2464 | DataEncoding::Regex { .. },
2465 ) => true,
2466 None => false,
2467 };
2468
2469 if is_composite {
2470 Ok(KeyEnvelope::Flattened)
2471 } else {
2472 Ok(KeyEnvelope::Named("key".to_string()))
2473 }
2474}
2475
2476pub fn describe_create_view(
2477 _: &StatementContext,
2478 _: CreateViewStatement<Aug>,
2479) -> Result<StatementDesc, PlanError> {
2480 Ok(StatementDesc::new(None))
2481}
2482
2483pub fn plan_view(
2484 scx: &StatementContext,
2485 def: &mut ViewDefinition<Aug>,
2486 params: &Params,
2487 temporary: bool,
2488) -> Result<(QualifiedItemName, View), PlanError> {
2489 let create_sql = normalize::create_statement(
2490 scx,
2491 Statement::CreateView(CreateViewStatement {
2492 if_exists: IfExistsBehavior::Error,
2493 temporary,
2494 definition: def.clone(),
2495 }),
2496 )?;
2497
2498 let ViewDefinition {
2499 name,
2500 columns,
2501 query,
2502 } = def;
2503
2504 let query::PlannedRootQuery {
2505 mut expr,
2506 mut desc,
2507 finishing,
2508 scope: _,
2509 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2510 assert!(finishing.is_trivial(expr.arity()));
2516
2517 expr.bind_parameters(params)?;
2518 let dependencies = expr
2519 .depends_on()
2520 .into_iter()
2521 .map(|gid| scx.catalog.resolve_item_id(&gid))
2522 .collect();
2523
2524 let name = if temporary {
2525 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2526 } else {
2527 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2528 };
2529
2530 plan_utils::maybe_rename_columns(
2531 format!("view {}", scx.catalog.resolve_full_name(&name)),
2532 &mut desc,
2533 columns,
2534 )?;
2535 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2536
2537 if let Some(dup) = names.iter().duplicates().next() {
2538 sql_bail!("column {} specified more than once", dup.as_str().quoted());
2539 }
2540
2541 let view = View {
2542 create_sql,
2543 expr,
2544 dependencies,
2545 column_names: names,
2546 temporary,
2547 };
2548
2549 Ok((name, view))
2550}
2551
2552pub fn plan_create_view(
2553 scx: &StatementContext,
2554 mut stmt: CreateViewStatement<Aug>,
2555 params: &Params,
2556) -> Result<Plan, PlanError> {
2557 let CreateViewStatement {
2558 temporary,
2559 if_exists,
2560 definition,
2561 } = &mut stmt;
2562 let (name, view) = plan_view(scx, definition, params, *temporary)?;
2563
2564 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2567
2568 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2569 let if_exists = true;
2570 let cascade = false;
2571 let maybe_item_to_drop = plan_drop_item(
2572 scx,
2573 ObjectType::View,
2574 if_exists,
2575 definition.name.clone(),
2576 cascade,
2577 )?;
2578
2579 if let Some(id) = maybe_item_to_drop {
2581 let dependencies = view.expr.depends_on();
2582 let invalid_drop = scx
2583 .get_item(&id)
2584 .global_ids()
2585 .any(|gid| dependencies.contains(&gid));
2586 if invalid_drop {
2587 let item = scx.catalog.get_item(&id);
2588 sql_bail!(
2589 "cannot replace view {0}: depended upon by new {0} definition",
2590 scx.catalog.resolve_full_name(item.name())
2591 );
2592 }
2593
2594 Some(id)
2595 } else {
2596 None
2597 }
2598 } else {
2599 None
2600 };
2601 let drop_ids = replace
2602 .map(|id| {
2603 scx.catalog
2604 .item_dependents(id)
2605 .into_iter()
2606 .map(|id| id.unwrap_item_id())
2607 .collect()
2608 })
2609 .unwrap_or_default();
2610
2611 let full_name = scx.catalog.resolve_full_name(&name);
2613 let partial_name = PartialItemName::from(full_name.clone());
2614 if let (Ok(item), IfExistsBehavior::Error, false) = (
2617 scx.catalog.resolve_item_or_type(&partial_name),
2618 *if_exists,
2619 ignore_if_exists_errors,
2620 ) {
2621 return Err(PlanError::ItemAlreadyExists {
2622 name: full_name.to_string(),
2623 item_type: item.item_type(),
2624 });
2625 }
2626
2627 Ok(Plan::CreateView(CreateViewPlan {
2628 name,
2629 view,
2630 replace,
2631 drop_ids,
2632 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2633 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2634 }))
2635}
2636
2637pub fn describe_create_materialized_view(
2638 _: &StatementContext,
2639 _: CreateMaterializedViewStatement<Aug>,
2640) -> Result<StatementDesc, PlanError> {
2641 Ok(StatementDesc::new(None))
2642}
2643
2644pub fn describe_create_continual_task(
2645 _: &StatementContext,
2646 _: CreateContinualTaskStatement<Aug>,
2647) -> Result<StatementDesc, PlanError> {
2648 Ok(StatementDesc::new(None))
2649}
2650
2651pub fn describe_create_network_policy(
2652 _: &StatementContext,
2653 _: CreateNetworkPolicyStatement<Aug>,
2654) -> Result<StatementDesc, PlanError> {
2655 Ok(StatementDesc::new(None))
2656}
2657
2658pub fn describe_alter_network_policy(
2659 _: &StatementContext,
2660 _: AlterNetworkPolicyStatement<Aug>,
2661) -> Result<StatementDesc, PlanError> {
2662 Ok(StatementDesc::new(None))
2663}
2664
2665pub fn plan_create_materialized_view(
2666 scx: &StatementContext,
2667 mut stmt: CreateMaterializedViewStatement<Aug>,
2668 params: &Params,
2669) -> Result<Plan, PlanError> {
2670 let cluster_id =
2671 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2672 stmt.in_cluster = Some(ResolvedClusterName {
2673 id: cluster_id,
2674 print_name: None,
2675 });
2676
2677 let create_sql =
2678 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2679
2680 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2681 let name = scx.allocate_qualified_name(partial_name.clone())?;
2682
2683 let query::PlannedRootQuery {
2684 mut expr,
2685 mut desc,
2686 finishing,
2687 scope: _,
2688 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2689 assert!(finishing.is_trivial(expr.arity()));
2691
2692 expr.bind_parameters(params)?;
2693
2694 plan_utils::maybe_rename_columns(
2695 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2696 &mut desc,
2697 &stmt.columns,
2698 )?;
2699 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2700
2701 let MaterializedViewOptionExtracted {
2702 assert_not_null,
2703 partition_by,
2704 retain_history,
2705 refresh,
2706 seen: _,
2707 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2708
2709 if let Some(partition_by) = partition_by {
2710 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2711 check_partition_by(&desc, partition_by)?;
2712 }
2713
2714 let refresh_schedule = {
2715 let mut refresh_schedule = RefreshSchedule::default();
2716 let mut on_commits_seen = 0;
2717 for refresh_option_value in refresh {
2718 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2719 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2720 }
2721 match refresh_option_value {
2722 RefreshOptionValue::OnCommit => {
2723 on_commits_seen += 1;
2724 }
2725 RefreshOptionValue::AtCreation => {
2726 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2727 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2728 }
2729 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2730 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2732 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2733 name: "REFRESH AT",
2734 scope: &Scope::empty(),
2735 relation_type: &RelationType::empty(),
2736 allow_aggregates: false,
2737 allow_subqueries: false,
2738 allow_parameters: false,
2739 allow_windows: false,
2740 };
2741 let hir = plan_expr(ecx, &time)?.cast_to(
2742 ecx,
2743 CastContext::Assignment,
2744 &ScalarType::MzTimestamp,
2745 )?;
2746 let timestamp = hir
2748 .into_literal_mz_timestamp()
2749 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2750 refresh_schedule.ats.push(timestamp);
2751 }
2752 RefreshOptionValue::Every(RefreshEveryOptionValue {
2753 interval,
2754 aligned_to,
2755 }) => {
2756 let interval = Interval::try_from_value(Value::Interval(interval))?;
2757 if interval.as_microseconds() <= 0 {
2758 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2759 }
2760 if interval.months != 0 {
2761 sql_bail!("REFRESH interval must not involve units larger than days");
2766 }
2767 let interval = interval.duration()?;
2768 if u64::try_from(interval.as_millis()).is_err() {
2769 sql_bail!("REFRESH interval too large");
2770 }
2771
2772 let mut aligned_to = match aligned_to {
2773 Some(aligned_to) => aligned_to,
2774 None => {
2775 soft_panic_or_log!(
2776 "ALIGNED TO should have been filled in by purification"
2777 );
2778 sql_bail!(
2779 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2780 )
2781 }
2782 };
2783
2784 transform_ast::transform(scx, &mut aligned_to)?;
2786
2787 let ecx = &ExprContext {
2788 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2789 name: "REFRESH EVERY ... ALIGNED TO",
2790 scope: &Scope::empty(),
2791 relation_type: &RelationType::empty(),
2792 allow_aggregates: false,
2793 allow_subqueries: false,
2794 allow_parameters: false,
2795 allow_windows: false,
2796 };
2797 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2798 ecx,
2799 CastContext::Assignment,
2800 &ScalarType::MzTimestamp,
2801 )?;
2802 let aligned_to_const = aligned_to_hir
2804 .into_literal_mz_timestamp()
2805 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2806
2807 refresh_schedule.everies.push(RefreshEvery {
2808 interval,
2809 aligned_to: aligned_to_const,
2810 });
2811 }
2812 }
2813 }
2814
2815 if on_commits_seen > 1 {
2816 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2817 }
2818 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2819 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2820 }
2821
2822 if refresh_schedule == RefreshSchedule::default() {
2823 None
2824 } else {
2825 Some(refresh_schedule)
2826 }
2827 };
2828
2829 let as_of = stmt.as_of.map(Timestamp::from);
2830 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2831 let mut non_null_assertions = assert_not_null
2832 .into_iter()
2833 .map(normalize::column_name)
2834 .map(|assertion_name| {
2835 column_names
2836 .iter()
2837 .position(|col| col == &assertion_name)
2838 .ok_or_else(|| {
2839 sql_err!(
2840 "column {} in ASSERT NOT NULL option not found",
2841 assertion_name.as_str().quoted()
2842 )
2843 })
2844 })
2845 .collect::<Result<Vec<_>, _>>()?;
2846 non_null_assertions.sort();
2847 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2848 let dup = &column_names[*dup];
2849 sql_bail!(
2850 "duplicate column {} in non-null assertions",
2851 dup.as_str().quoted()
2852 );
2853 }
2854
2855 if let Some(dup) = column_names.iter().duplicates().next() {
2856 sql_bail!("column {} specified more than once", dup.as_str().quoted());
2857 }
2858
2859 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2862 Ok(true) => IfExistsBehavior::Skip,
2863 _ => stmt.if_exists,
2864 };
2865
2866 let mut replace = None;
2867 let mut if_not_exists = false;
2868 match if_exists {
2869 IfExistsBehavior::Replace => {
2870 let if_exists = true;
2871 let cascade = false;
2872 let replace_id = plan_drop_item(
2873 scx,
2874 ObjectType::MaterializedView,
2875 if_exists,
2876 partial_name.into(),
2877 cascade,
2878 )?;
2879
2880 if let Some(id) = replace_id {
2882 let dependencies = expr.depends_on();
2883 let invalid_drop = scx
2884 .get_item(&id)
2885 .global_ids()
2886 .any(|gid| dependencies.contains(&gid));
2887 if invalid_drop {
2888 let item = scx.catalog.get_item(&id);
2889 sql_bail!(
2890 "cannot replace materialized view {0}: depended upon by new {0} definition",
2891 scx.catalog.resolve_full_name(item.name())
2892 );
2893 }
2894 replace = Some(id);
2895 }
2896 }
2897 IfExistsBehavior::Skip => if_not_exists = true,
2898 IfExistsBehavior::Error => (),
2899 }
2900 let drop_ids = replace
2901 .map(|id| {
2902 scx.catalog
2903 .item_dependents(id)
2904 .into_iter()
2905 .map(|id| id.unwrap_item_id())
2906 .collect()
2907 })
2908 .unwrap_or_default();
2909 let dependencies = expr
2910 .depends_on()
2911 .into_iter()
2912 .map(|gid| scx.catalog.resolve_item_id(&gid))
2913 .collect();
2914
2915 let full_name = scx.catalog.resolve_full_name(&name);
2917 let partial_name = PartialItemName::from(full_name.clone());
2918 if let (IfExistsBehavior::Error, Ok(item)) =
2921 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2922 {
2923 return Err(PlanError::ItemAlreadyExists {
2924 name: full_name.to_string(),
2925 item_type: item.item_type(),
2926 });
2927 }
2928
2929 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2930 name,
2931 materialized_view: MaterializedView {
2932 create_sql,
2933 expr,
2934 dependencies,
2935 column_names,
2936 cluster_id,
2937 non_null_assertions,
2938 compaction_window,
2939 refresh_schedule,
2940 as_of,
2941 },
2942 replace,
2943 drop_ids,
2944 if_not_exists,
2945 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2946 }))
2947}
2948
2949generate_extracted_config!(
2950 MaterializedViewOption,
2951 (AssertNotNull, Ident, AllowMultiple),
2952 (PartitionBy, Vec<Ident>),
2953 (RetainHistory, OptionalDuration),
2954 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2955);
2956
2957pub fn plan_create_continual_task(
2958 scx: &StatementContext,
2959 mut stmt: CreateContinualTaskStatement<Aug>,
2960 params: &Params,
2961) -> Result<Plan, PlanError> {
2962 match &stmt.sugar {
2963 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
2964 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
2965 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
2966 }
2967 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
2968 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
2969 }
2970 };
2971 let cluster_id = match &stmt.in_cluster {
2972 None => scx.catalog.resolve_cluster(None)?.id(),
2973 Some(in_cluster) => in_cluster.id,
2974 };
2975 stmt.in_cluster = Some(ResolvedClusterName {
2976 id: cluster_id,
2977 print_name: None,
2978 });
2979
2980 let create_sql =
2981 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
2982
2983 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
2984
2985 let mut desc = match stmt.columns {
2990 None => None,
2991 Some(columns) => {
2992 let mut desc_columns = Vec::with_capacity(columns.capacity());
2993 for col in columns.iter() {
2994 desc_columns.push((
2995 normalize::column_name(col.name.clone()),
2996 ColumnType {
2997 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
2998 nullable: false,
2999 },
3000 ));
3001 }
3002 Some(RelationDesc::from_names_and_types(desc_columns))
3003 }
3004 };
3005 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3006 match input.item_type() {
3007 CatalogItemType::ContinualTask
3010 | CatalogItemType::Table
3011 | CatalogItemType::MaterializedView
3012 | CatalogItemType::Source => {}
3013 CatalogItemType::Sink
3014 | CatalogItemType::View
3015 | CatalogItemType::Index
3016 | CatalogItemType::Type
3017 | CatalogItemType::Func
3018 | CatalogItemType::Secret
3019 | CatalogItemType::Connection => {
3020 sql_bail!(
3021 "CONTINUAL TASK cannot use {} as an input",
3022 input.item_type()
3023 );
3024 }
3025 }
3026
3027 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3028 let ct_name = stmt.name;
3029 let placeholder_id = match &ct_name {
3030 ResolvedItemName::ContinualTask { id, name } => {
3031 let desc = match desc.as_ref().cloned() {
3032 Some(x) => x,
3033 None => {
3034 let input_name = scx.catalog.resolve_full_name(input.name());
3039 input.desc(&input_name)?.into_owned()
3040 }
3041 };
3042 qcx.ctes.insert(
3043 *id,
3044 CteDesc {
3045 name: name.item.clone(),
3046 desc,
3047 },
3048 );
3049 Some(*id)
3050 }
3051 _ => None,
3052 };
3053
3054 let mut exprs = Vec::new();
3055 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3056 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3057 let query::PlannedRootQuery {
3058 mut expr,
3059 desc: desc_query,
3060 finishing,
3061 scope: _,
3062 } = query::plan_ct_query(&mut qcx, query)?;
3063 assert!(finishing.is_trivial(expr.arity()));
3066 expr.bind_parameters(params)?;
3067 let expr = match desc.as_mut() {
3068 None => {
3069 desc = Some(desc_query);
3070 expr
3071 }
3072 Some(desc) => {
3073 if desc_query.arity() > desc.arity() {
3076 sql_bail!(
3077 "statement {}: INSERT has more expressions than target columns",
3078 idx
3079 );
3080 }
3081 if desc_query.arity() < desc.arity() {
3082 sql_bail!(
3083 "statement {}: INSERT has more target columns than expressions",
3084 idx
3085 );
3086 }
3087 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3090 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3091 let expr = expr.map_err(|e| {
3092 sql_err!(
3093 "statement {}: column {} is of type {} but expression is of type {}",
3094 idx,
3095 desc.get_name(e.column).as_str().quoted(),
3096 qcx.humanize_scalar_type(&e.target_type, false),
3097 qcx.humanize_scalar_type(&e.source_type, false),
3098 )
3099 })?;
3100
3101 let zip_types = || desc.iter_types().zip(desc_query.iter_types());
3104 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3105 if updated {
3106 let new_types = zip_types().map(|(ct, q)| {
3107 let mut ct = ct.clone();
3108 if q.nullable {
3109 ct.nullable = true;
3110 }
3111 ct
3112 });
3113 *desc = RelationDesc::from_names_and_types(
3114 desc.iter_names().cloned().zip(new_types),
3115 );
3116 }
3117
3118 expr
3119 }
3120 };
3121 match stmt {
3122 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3123 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3124 }
3125 }
3126 let expr = exprs
3129 .into_iter()
3130 .reduce(|acc, expr| acc.union(expr))
3131 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3132 let dependencies = expr
3133 .depends_on()
3134 .into_iter()
3135 .map(|gid| scx.catalog.resolve_item_id(&gid))
3136 .collect();
3137
3138 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3139 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3140 if let Some(dup) = column_names.iter().duplicates().next() {
3141 sql_bail!("column {} specified more than once", dup.as_str().quoted());
3142 }
3143
3144 let name = match &ct_name {
3146 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3147 ResolvedItemName::ContinualTask { name, .. } => {
3148 let name = scx.allocate_qualified_name(name.clone())?;
3149 let full_name = scx.catalog.resolve_full_name(&name);
3150 let partial_name = PartialItemName::from(full_name.clone());
3151 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3154 return Err(PlanError::ItemAlreadyExists {
3155 name: full_name.to_string(),
3156 item_type: item.item_type(),
3157 });
3158 }
3159 name
3160 }
3161 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3162 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3163 };
3164
3165 let as_of = stmt.as_of.map(Timestamp::from);
3166 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3167 name,
3168 placeholder_id,
3169 desc,
3170 input_id: input.global_id(),
3171 with_snapshot: snapshot.unwrap_or(true),
3172 continual_task: MaterializedView {
3173 create_sql,
3174 expr,
3175 dependencies,
3176 column_names,
3177 cluster_id,
3178 non_null_assertions: Vec::new(),
3179 compaction_window: None,
3180 refresh_schedule: None,
3181 as_of,
3182 },
3183 }))
3184}
3185
3186fn continual_task_query<'a>(
3187 ct_name: &ResolvedItemName,
3188 stmt: &'a ast::ContinualTaskStmt<Aug>,
3189) -> Option<ast::Query<Aug>> {
3190 match stmt {
3191 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3192 table_name: _,
3193 columns,
3194 source,
3195 returning,
3196 }) => {
3197 if !columns.is_empty() || !returning.is_empty() {
3198 return None;
3199 }
3200 match source {
3201 ast::InsertSource::Query(query) => Some(query.clone()),
3202 ast::InsertSource::DefaultValues => None,
3203 }
3204 }
3205 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3206 table_name: _,
3207 alias,
3208 using,
3209 selection,
3210 }) => {
3211 if !using.is_empty() {
3212 return None;
3213 }
3214 let from = ast::TableWithJoins {
3216 relation: ast::TableFactor::Table {
3217 name: ct_name.clone(),
3218 alias: alias.clone(),
3219 },
3220 joins: Vec::new(),
3221 };
3222 let select = ast::Select {
3223 from: vec![from],
3224 selection: selection.clone(),
3225 distinct: None,
3226 projection: vec![ast::SelectItem::Wildcard],
3227 group_by: Vec::new(),
3228 having: None,
3229 qualify: None,
3230 options: Vec::new(),
3231 };
3232 let query = ast::Query {
3233 ctes: ast::CteBlock::Simple(Vec::new()),
3234 body: ast::SetExpr::Select(Box::new(select)),
3235 order_by: Vec::new(),
3236 limit: None,
3237 offset: None,
3238 };
3239 Some(query)
3241 }
3242 }
3243}
3244
3245generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3246
3247pub fn describe_create_sink(
3248 _: &StatementContext,
3249 _: CreateSinkStatement<Aug>,
3250) -> Result<StatementDesc, PlanError> {
3251 Ok(StatementDesc::new(None))
3252}
3253
3254generate_extracted_config!(
3255 CreateSinkOption,
3256 (Snapshot, bool),
3257 (PartitionStrategy, String),
3258 (Version, u64)
3259);
3260
3261pub fn plan_create_sink(
3262 scx: &StatementContext,
3263 stmt: CreateSinkStatement<Aug>,
3264) -> Result<Plan, PlanError> {
3265 let Some(name) = stmt.name.clone() else {
3267 return Err(PlanError::MissingName(CatalogItemType::Sink));
3268 };
3269 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3270 let full_name = scx.catalog.resolve_full_name(&name);
3271 let partial_name = PartialItemName::from(full_name.clone());
3272 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3273 return Err(PlanError::ItemAlreadyExists {
3274 name: full_name.to_string(),
3275 item_type: item.item_type(),
3276 });
3277 }
3278
3279 plan_sink(scx, stmt)
3280}
3281
3282fn plan_sink(
3287 scx: &StatementContext,
3288 mut stmt: CreateSinkStatement<Aug>,
3289) -> Result<Plan, PlanError> {
3290 let CreateSinkStatement {
3291 name,
3292 in_cluster: _,
3293 from,
3294 connection,
3295 format,
3296 envelope,
3297 if_not_exists,
3298 with_options,
3299 } = stmt.clone();
3300
3301 let Some(name) = name else {
3302 return Err(PlanError::MissingName(CatalogItemType::Sink));
3303 };
3304 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3305
3306 let envelope = match envelope {
3307 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3308 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3309 None => sql_bail!("ENVELOPE clause is required"),
3310 };
3311
3312 let from_name = &from;
3313 let from = scx.get_item_by_resolved_name(&from)?;
3314 if from.id().is_system() {
3315 bail_unsupported!("creating a sink directly on a catalog object");
3316 }
3317 let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3318 let key_indices = match &connection {
3319 CreateSinkConnection::Kafka { key, .. } => {
3320 if let Some(key) = key.clone() {
3321 let key_columns = key
3322 .key_columns
3323 .into_iter()
3324 .map(normalize::column_name)
3325 .collect::<Vec<_>>();
3326 let mut uniq = BTreeSet::new();
3327 for col in key_columns.iter() {
3328 if !uniq.insert(col) {
3329 sql_bail!("duplicate column referenced in KEY: {}", col);
3330 }
3331 }
3332 let indices = key_columns
3333 .iter()
3334 .map(|col| -> anyhow::Result<usize> {
3335 let name_idx =
3336 desc.get_by_name(col)
3337 .map(|(idx, _type)| idx)
3338 .ok_or_else(|| {
3339 sql_err!("column referenced in KEY does not exist: {}", col)
3340 })?;
3341 if desc.get_unambiguous_name(name_idx).is_none() {
3342 sql_err!("column referenced in KEY is ambiguous: {}", col);
3343 }
3344 Ok(name_idx)
3345 })
3346 .collect::<Result<Vec<_>, _>>()?;
3347 let is_valid_key =
3348 desc.typ().keys.iter().any(|key_columns| {
3349 key_columns.iter().all(|column| indices.contains(column))
3350 });
3351
3352 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3353 if key.not_enforced {
3354 scx.catalog
3355 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3356 key: key_columns.clone(),
3357 name: name.item.clone(),
3358 })
3359 } else {
3360 return Err(PlanError::UpsertSinkWithInvalidKey {
3361 name: from_name.full_name_str(),
3362 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3363 valid_keys: desc
3364 .typ()
3365 .keys
3366 .iter()
3367 .map(|key| {
3368 key.iter()
3369 .map(|col| desc.get_name(*col).as_str().into())
3370 .collect()
3371 })
3372 .collect(),
3373 });
3374 }
3375 }
3376 Some(indices)
3377 } else {
3378 None
3379 }
3380 }
3381 };
3382
3383 let headers_index = match &connection {
3384 CreateSinkConnection::Kafka {
3385 headers: Some(headers),
3386 ..
3387 } => {
3388 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3389
3390 match envelope {
3391 SinkEnvelope::Upsert => (),
3392 SinkEnvelope::Debezium => {
3393 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3394 }
3395 };
3396
3397 let headers = normalize::column_name(headers.clone());
3398 let (idx, ty) = desc
3399 .get_by_name(&headers)
3400 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3401
3402 if desc.get_unambiguous_name(idx).is_none() {
3403 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3404 }
3405
3406 match &ty.scalar_type {
3407 ScalarType::Map { value_type, .. }
3408 if matches!(&**value_type, ScalarType::String | ScalarType::Bytes) => {}
3409 _ => sql_bail!(
3410 "HEADERS column must have type map[text => text] or map[text => bytea]"
3411 ),
3412 }
3413
3414 Some(idx)
3415 }
3416 _ => None,
3417 };
3418
3419 let relation_key_indices = desc.typ().keys.get(0).cloned();
3421
3422 let key_desc_and_indices = key_indices.map(|key_indices| {
3423 let cols = desc
3424 .iter()
3425 .map(|(name, ty)| (name.clone(), ty.clone()))
3426 .collect::<Vec<_>>();
3427 let (names, types): (Vec<_>, Vec<_>) =
3428 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3429 let typ = RelationType::new(types);
3430 (RelationDesc::new(typ, names), key_indices)
3431 });
3432
3433 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3434 return Err(PlanError::UpsertSinkWithoutKey);
3435 }
3436
3437 let connection_builder = match connection {
3438 CreateSinkConnection::Kafka {
3439 connection,
3440 options,
3441 ..
3442 } => kafka_sink_builder(
3443 scx,
3444 connection,
3445 options,
3446 format,
3447 relation_key_indices,
3448 key_desc_and_indices,
3449 headers_index,
3450 desc.into_owned(),
3451 envelope,
3452 from.id(),
3453 )?,
3454 };
3455
3456 let CreateSinkOptionExtracted {
3457 snapshot,
3458 version,
3459 partition_strategy: _,
3460 seen: _,
3461 } = with_options.try_into()?;
3462
3463 let with_snapshot = snapshot.unwrap_or(true);
3465 let version = version.unwrap_or(0);
3467
3468 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3472 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3473
3474 Ok(Plan::CreateSink(CreateSinkPlan {
3475 name,
3476 sink: Sink {
3477 create_sql,
3478 from: from.global_id(),
3479 connection: connection_builder,
3480 envelope,
3481 version,
3482 },
3483 with_snapshot,
3484 if_not_exists,
3485 in_cluster: in_cluster.id(),
3486 }))
3487}
3488
3489fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3490 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3491
3492 let existing_keys = desc
3493 .typ()
3494 .keys
3495 .iter()
3496 .map(|key_columns| {
3497 key_columns
3498 .iter()
3499 .map(|col| desc.get_name(*col).as_str())
3500 .join(", ")
3501 })
3502 .join(", ");
3503
3504 sql_err!(
3505 "Key constraint ({}) conflicts with existing key ({})",
3506 user_keys,
3507 existing_keys
3508 )
3509}
3510
3511#[derive(Debug, Default, PartialEq, Clone)]
3514pub struct CsrConfigOptionExtracted {
3515 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3516 pub(crate) avro_key_fullname: Option<String>,
3517 pub(crate) avro_value_fullname: Option<String>,
3518 pub(crate) null_defaults: bool,
3519 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3520 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3521 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3522 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3523}
3524
3525impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3526 type Error = crate::plan::PlanError;
3527 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3528 let mut extracted = CsrConfigOptionExtracted::default();
3529 let mut common_doc_comments = BTreeMap::new();
3530 for option in v {
3531 if !extracted.seen.insert(option.name.clone()) {
3532 return Err(PlanError::Unstructured({
3533 format!("{} specified more than once", option.name)
3534 }));
3535 }
3536 let option_name = option.name.clone();
3537 let option_name_str = option_name.to_ast_string_simple();
3538 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3539 option_name: option_name.to_ast_string_simple(),
3540 err: e.into(),
3541 };
3542 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3543 val.map(|s| match s {
3544 WithOptionValue::Value(Value::String(s)) => {
3545 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3546 }
3547 _ => Err("must be a string".to_string()),
3548 })
3549 .transpose()
3550 .map_err(PlanError::Unstructured)
3551 .map_err(better_error)
3552 };
3553 match option.name {
3554 CsrConfigOptionName::AvroKeyFullname => {
3555 extracted.avro_key_fullname =
3556 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3557 }
3558 CsrConfigOptionName::AvroValueFullname => {
3559 extracted.avro_value_fullname =
3560 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3561 }
3562 CsrConfigOptionName::NullDefaults => {
3563 extracted.null_defaults =
3564 <bool>::try_from_value(option.value).map_err(better_error)?;
3565 }
3566 CsrConfigOptionName::AvroDocOn(doc_on) => {
3567 let value = String::try_from_value(option.value.ok_or_else(|| {
3568 PlanError::InvalidOptionValue {
3569 option_name: option_name_str,
3570 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3571 }
3572 })?)
3573 .map_err(better_error)?;
3574 let key = match doc_on.identifier {
3575 DocOnIdentifier::Column(ast::ColumnName {
3576 relation: ResolvedItemName::Item { id, .. },
3577 column: ResolvedColumnReference::Column { name, index: _ },
3578 }) => DocTarget::Field {
3579 object_id: id,
3580 column_name: name,
3581 },
3582 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3583 DocTarget::Type(id)
3584 }
3585 _ => unreachable!(),
3586 };
3587
3588 match doc_on.for_schema {
3589 DocOnSchema::KeyOnly => {
3590 extracted.key_doc_options.insert(key, value);
3591 }
3592 DocOnSchema::ValueOnly => {
3593 extracted.value_doc_options.insert(key, value);
3594 }
3595 DocOnSchema::All => {
3596 common_doc_comments.insert(key, value);
3597 }
3598 }
3599 }
3600 CsrConfigOptionName::KeyCompatibilityLevel => {
3601 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3602 }
3603 CsrConfigOptionName::ValueCompatibilityLevel => {
3604 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3605 }
3606 }
3607 }
3608
3609 for (key, value) in common_doc_comments {
3610 if !extracted.key_doc_options.contains_key(&key) {
3611 extracted.key_doc_options.insert(key.clone(), value.clone());
3612 }
3613 if !extracted.value_doc_options.contains_key(&key) {
3614 extracted.value_doc_options.insert(key, value);
3615 }
3616 }
3617 Ok(extracted)
3618 }
3619}
3620
3621fn kafka_sink_builder(
3622 scx: &StatementContext,
3623 connection: ResolvedItemName,
3624 options: Vec<KafkaSinkConfigOption<Aug>>,
3625 format: Option<FormatSpecifier<Aug>>,
3626 relation_key_indices: Option<Vec<usize>>,
3627 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3628 headers_index: Option<usize>,
3629 value_desc: RelationDesc,
3630 envelope: SinkEnvelope,
3631 sink_from: CatalogItemId,
3632) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3633 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3635 let connection_id = connection_item.id();
3636 match connection_item.connection()? {
3637 Connection::Kafka(_) => (),
3638 _ => sql_bail!(
3639 "{} is not a kafka connection",
3640 scx.catalog.resolve_full_name(connection_item.name())
3641 ),
3642 };
3643
3644 let KafkaSinkConfigOptionExtracted {
3645 topic,
3646 compression_type,
3647 partition_by,
3648 progress_group_id_prefix,
3649 transactional_id_prefix,
3650 legacy_ids,
3651 topic_config,
3652 topic_metadata_refresh_interval,
3653 topic_partition_count,
3654 topic_replication_factor,
3655 seen: _,
3656 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3657
3658 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3659 (Some(_), Some(true)) => {
3660 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3661 }
3662 (None, Some(true)) => KafkaIdStyle::Legacy,
3663 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3664 };
3665
3666 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3667 (Some(_), Some(true)) => {
3668 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3669 }
3670 (None, Some(true)) => KafkaIdStyle::Legacy,
3671 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3672 };
3673
3674 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3675
3676 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3677 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3680 }
3681
3682 let assert_positive = |val: Option<i32>, name: &str| {
3683 if let Some(val) = val {
3684 if val <= 0 {
3685 sql_bail!("{} must be a positive integer", name);
3686 }
3687 }
3688 val.map(NonNeg::try_from)
3689 .transpose()
3690 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3691 };
3692 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3693 let topic_replication_factor =
3694 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3695
3696 let gen_avro_schema_options = |conn| {
3699 let CsrConnectionAvro {
3700 connection:
3701 CsrConnection {
3702 connection,
3703 options,
3704 },
3705 seed,
3706 key_strategy,
3707 value_strategy,
3708 } = conn;
3709 if seed.is_some() {
3710 sql_bail!("SEED option does not make sense with sinks");
3711 }
3712 if key_strategy.is_some() {
3713 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3714 }
3715 if value_strategy.is_some() {
3716 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3717 }
3718
3719 let item = scx.get_item_by_resolved_name(&connection)?;
3720 let csr_connection = match item.connection()? {
3721 Connection::Csr(_) => item.id(),
3722 _ => {
3723 sql_bail!(
3724 "{} is not a schema registry connection",
3725 scx.catalog
3726 .resolve_full_name(item.name())
3727 .to_string()
3728 .quoted()
3729 )
3730 }
3731 };
3732 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3733
3734 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3735 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3736 }
3737
3738 if key_desc_and_indices.is_some()
3739 && (extracted_options.avro_key_fullname.is_some()
3740 ^ extracted_options.avro_value_fullname.is_some())
3741 {
3742 sql_bail!(
3743 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3744 );
3745 }
3746
3747 Ok((csr_connection, extracted_options))
3748 };
3749
3750 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3751 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3752 Format::Bytes if desc.arity() == 1 => {
3753 let col_type = &desc.typ().column_types[0].scalar_type;
3754 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3755 bail_unsupported!(format!(
3756 "BYTES format with non-encodable type: {:?}",
3757 col_type
3758 ));
3759 }
3760
3761 Ok(KafkaSinkFormatType::Bytes)
3762 }
3763 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3764 Format::Bytes | Format::Text => {
3765 bail_unsupported!("BYTES or TEXT format with multiple columns")
3766 }
3767 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3768 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3769 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3770 let schema = if is_key {
3771 AvroSchemaGenerator::new(
3772 desc.clone(),
3773 false,
3774 options.key_doc_options,
3775 options.avro_key_fullname.as_deref().unwrap_or("row"),
3776 options.null_defaults,
3777 Some(sink_from),
3778 false,
3779 )?
3780 .schema()
3781 .to_string()
3782 } else {
3783 AvroSchemaGenerator::new(
3784 desc.clone(),
3785 matches!(envelope, SinkEnvelope::Debezium),
3786 options.value_doc_options,
3787 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3788 options.null_defaults,
3789 Some(sink_from),
3790 true,
3791 )?
3792 .schema()
3793 .to_string()
3794 };
3795 Ok(KafkaSinkFormatType::Avro {
3796 schema,
3797 compatibility_level: if is_key {
3798 options.key_compatibility_level
3799 } else {
3800 options.value_compatibility_level
3801 },
3802 csr_connection,
3803 })
3804 }
3805 format => bail_unsupported!(format!("sink format {:?}", format)),
3806 };
3807
3808 let partition_by = match &partition_by {
3809 Some(partition_by) => {
3810 let mut scope = Scope::from_source(None, value_desc.iter_names());
3811
3812 match envelope {
3813 SinkEnvelope::Upsert => (),
3814 SinkEnvelope::Debezium => {
3815 let key_indices: HashSet<_> = key_desc_and_indices
3816 .as_ref()
3817 .map(|(_desc, indices)| indices.as_slice())
3818 .unwrap_or_default()
3819 .into_iter()
3820 .collect();
3821 for (i, item) in scope.items.iter_mut().enumerate() {
3822 if !key_indices.contains(&i) {
3823 item.error_if_referenced = Some(|_table, column| {
3824 PlanError::InvalidPartitionByEnvelopeDebezium {
3825 column_name: column.to_string(),
3826 }
3827 });
3828 }
3829 }
3830 }
3831 };
3832
3833 let ecx = &ExprContext {
3834 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3835 name: "PARTITION BY",
3836 scope: &scope,
3837 relation_type: value_desc.typ(),
3838 allow_aggregates: false,
3839 allow_subqueries: false,
3840 allow_parameters: false,
3841 allow_windows: false,
3842 };
3843 let expr = plan_expr(ecx, partition_by)?.cast_to(
3844 ecx,
3845 CastContext::Assignment,
3846 &ScalarType::UInt64,
3847 )?;
3848 let expr = expr.lower_uncorrelated()?;
3849
3850 Some(expr)
3851 }
3852 _ => None,
3853 };
3854
3855 let format = match format {
3857 Some(FormatSpecifier::KeyValue { key, value }) => {
3858 let key_format = match key_desc_and_indices.as_ref() {
3859 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3860 None => None,
3861 };
3862 KafkaSinkFormat {
3863 value_format: map_format(value, &value_desc, false)?,
3864 key_format,
3865 }
3866 }
3867 Some(FormatSpecifier::Bare(format)) => {
3868 let key_format = match key_desc_and_indices.as_ref() {
3869 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3870 None => None,
3871 };
3872 KafkaSinkFormat {
3873 value_format: map_format(format, &value_desc, false)?,
3874 key_format,
3875 }
3876 }
3877 None => bail_unsupported!("sink without format"),
3878 };
3879
3880 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3881 connection_id,
3882 connection: connection_id,
3883 format,
3884 topic: topic_name,
3885 relation_key_indices,
3886 key_desc_and_indices,
3887 headers_index,
3888 value_desc,
3889 partition_by,
3890 compression_type,
3891 progress_group_id,
3892 transactional_id,
3893 topic_options: KafkaTopicOptions {
3894 partition_count: topic_partition_count,
3895 replication_factor: topic_replication_factor,
3896 topic_config: topic_config.unwrap_or_default(),
3897 },
3898 topic_metadata_refresh_interval,
3899 }))
3900}
3901
3902pub fn describe_create_index(
3903 _: &StatementContext,
3904 _: CreateIndexStatement<Aug>,
3905) -> Result<StatementDesc, PlanError> {
3906 Ok(StatementDesc::new(None))
3907}
3908
3909pub fn plan_create_index(
3910 scx: &StatementContext,
3911 mut stmt: CreateIndexStatement<Aug>,
3912) -> Result<Plan, PlanError> {
3913 let CreateIndexStatement {
3914 name,
3915 on_name,
3916 in_cluster,
3917 key_parts,
3918 with_options,
3919 if_not_exists,
3920 } = &mut stmt;
3921 let on = scx.get_item_by_resolved_name(on_name)?;
3922 let on_item_type = on.item_type();
3923
3924 if !matches!(
3925 on_item_type,
3926 CatalogItemType::View
3927 | CatalogItemType::MaterializedView
3928 | CatalogItemType::Source
3929 | CatalogItemType::Table
3930 ) {
3931 sql_bail!(
3932 "index cannot be created on {} because it is a {}",
3933 on_name.full_name_str(),
3934 on.item_type()
3935 )
3936 }
3937
3938 let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3939
3940 let filled_key_parts = match key_parts {
3941 Some(kp) => kp.to_vec(),
3942 None => {
3943 on.desc(&scx.catalog.resolve_full_name(on.name()))?
3945 .typ()
3946 .default_key()
3947 .iter()
3948 .map(|i| match on_desc.get_unambiguous_name(*i) {
3949 Some(n) => Expr::Identifier(vec![n.clone().into()]),
3950 _ => Expr::Value(Value::Number((i + 1).to_string())),
3951 })
3952 .collect()
3953 }
3954 };
3955 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
3956
3957 let index_name = if let Some(name) = name {
3958 QualifiedItemName {
3959 qualifiers: on.name().qualifiers.clone(),
3960 item: normalize::ident(name.clone()),
3961 }
3962 } else {
3963 let mut idx_name = QualifiedItemName {
3964 qualifiers: on.name().qualifiers.clone(),
3965 item: on.name().item.clone(),
3966 };
3967 if key_parts.is_none() {
3968 idx_name.item += "_primary_idx";
3970 } else {
3971 let index_name_col_suffix = keys
3974 .iter()
3975 .map(|k| match k {
3976 mz_expr::MirScalarExpr::Column(i) => match on_desc.get_unambiguous_name(*i) {
3977 Some(col_name) => col_name.to_string(),
3978 None => format!("{}", i + 1),
3979 },
3980 _ => "expr".to_string(),
3981 })
3982 .join("_");
3983 write!(idx_name.item, "_{index_name_col_suffix}_idx")
3984 .expect("write on strings cannot fail");
3985 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
3986 }
3987
3988 if !*if_not_exists {
3989 scx.catalog.find_available_name(idx_name)
3990 } else {
3991 idx_name
3992 }
3993 };
3994
3995 let full_name = scx.catalog.resolve_full_name(&index_name);
3997 let partial_name = PartialItemName::from(full_name.clone());
3998 if let (Ok(item), false, false) = (
4006 scx.catalog.resolve_item_or_type(&partial_name),
4007 *if_not_exists,
4008 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4009 ) {
4010 return Err(PlanError::ItemAlreadyExists {
4011 name: full_name.to_string(),
4012 item_type: item.item_type(),
4013 });
4014 }
4015
4016 let options = plan_index_options(scx, with_options.clone())?;
4017 let cluster_id = match in_cluster {
4018 None => scx.resolve_cluster(None)?.id(),
4019 Some(in_cluster) => in_cluster.id,
4020 };
4021
4022 *in_cluster = Some(ResolvedClusterName {
4023 id: cluster_id,
4024 print_name: None,
4025 });
4026
4027 *name = Some(Ident::new(index_name.item.clone())?);
4029 *key_parts = Some(filled_key_parts);
4030 let if_not_exists = *if_not_exists;
4031
4032 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4033 let compaction_window = options.iter().find_map(|o| {
4034 #[allow(irrefutable_let_patterns)]
4035 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4036 Some(lcw.clone())
4037 } else {
4038 None
4039 }
4040 });
4041
4042 Ok(Plan::CreateIndex(CreateIndexPlan {
4043 name: index_name,
4044 index: Index {
4045 create_sql,
4046 on: on.global_id(),
4047 keys,
4048 cluster_id,
4049 compaction_window,
4050 },
4051 if_not_exists,
4052 }))
4053}
4054
4055pub fn describe_create_type(
4056 _: &StatementContext,
4057 _: CreateTypeStatement<Aug>,
4058) -> Result<StatementDesc, PlanError> {
4059 Ok(StatementDesc::new(None))
4060}
4061
4062pub fn plan_create_type(
4063 scx: &StatementContext,
4064 stmt: CreateTypeStatement<Aug>,
4065) -> Result<Plan, PlanError> {
4066 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4067 let CreateTypeStatement { name, as_type, .. } = stmt;
4068
4069 fn validate_data_type(
4070 scx: &StatementContext,
4071 data_type: ResolvedDataType,
4072 as_type: &str,
4073 key: &str,
4074 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4075 let (id, modifiers) = match data_type {
4076 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4077 _ => sql_bail!(
4078 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4079 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4080 as_type,
4081 key,
4082 data_type.human_readable_name(),
4083 ),
4084 };
4085
4086 let item = scx.catalog.get_item(&id);
4087 match item.type_details() {
4088 None => sql_bail!(
4089 "{} must be of class type, but received {} which is of class {}",
4090 key,
4091 scx.catalog.resolve_full_name(item.name()),
4092 item.item_type()
4093 ),
4094 Some(CatalogTypeDetails {
4095 typ: CatalogType::Char,
4096 ..
4097 }) => {
4098 bail_unsupported!("embedding char type in a list or map")
4099 }
4100 _ => {
4101 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4103
4104 Ok((id, modifiers))
4105 }
4106 }
4107 }
4108
4109 let inner = match as_type {
4110 CreateTypeAs::List { options } => {
4111 let CreateTypeListOptionExtracted {
4112 element_type,
4113 seen: _,
4114 } = CreateTypeListOptionExtracted::try_from(options)?;
4115 let element_type =
4116 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4117 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4118 CatalogType::List {
4119 element_reference: id,
4120 element_modifiers: modifiers,
4121 }
4122 }
4123 CreateTypeAs::Map { options } => {
4124 let CreateTypeMapOptionExtracted {
4125 key_type,
4126 value_type,
4127 seen: _,
4128 } = CreateTypeMapOptionExtracted::try_from(options)?;
4129 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4130 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4131 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4132 let (value_id, value_modifiers) =
4133 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4134 CatalogType::Map {
4135 key_reference: key_id,
4136 key_modifiers,
4137 value_reference: value_id,
4138 value_modifiers,
4139 }
4140 }
4141 CreateTypeAs::Record { column_defs } => {
4142 let mut fields = vec![];
4143 for column_def in column_defs {
4144 let data_type = column_def.data_type;
4145 let key = ident(column_def.name.clone());
4146 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4147 fields.push(CatalogRecordField {
4148 name: ColumnName::from(key.clone()),
4149 type_reference: id,
4150 type_modifiers: modifiers,
4151 });
4152 }
4153 CatalogType::Record { fields }
4154 }
4155 };
4156
4157 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4158
4159 let full_name = scx.catalog.resolve_full_name(&name);
4161 let partial_name = PartialItemName::from(full_name.clone());
4162 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4165 if item.item_type().conflicts_with_type() {
4166 return Err(PlanError::ItemAlreadyExists {
4167 name: full_name.to_string(),
4168 item_type: item.item_type(),
4169 });
4170 }
4171 }
4172
4173 Ok(Plan::CreateType(CreateTypePlan {
4174 name,
4175 typ: Type { create_sql, inner },
4176 }))
4177}
4178
4179generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4180
4181generate_extracted_config!(
4182 CreateTypeMapOption,
4183 (KeyType, ResolvedDataType),
4184 (ValueType, ResolvedDataType)
4185);
4186
4187#[derive(Debug)]
4188pub enum PlannedAlterRoleOption {
4189 Attributes(PlannedRoleAttributes),
4190 Variable(PlannedRoleVariable),
4191}
4192
4193#[derive(Debug, Clone)]
4194pub struct PlannedRoleAttributes {
4195 pub inherit: Option<bool>,
4196 pub password: Option<Password>,
4197 pub nopassword: Option<bool>,
4201 pub superuser: Option<bool>,
4202 pub login: Option<bool>,
4203}
4204
4205fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4206 let mut planned_attributes = PlannedRoleAttributes {
4207 inherit: None,
4208 password: None,
4209 superuser: None,
4210 login: None,
4211 nopassword: None,
4212 };
4213
4214 for option in options {
4215 match option {
4216 RoleAttribute::Inherit | RoleAttribute::NoInherit
4217 if planned_attributes.inherit.is_some() =>
4218 {
4219 sql_bail!("conflicting or redundant options");
4220 }
4221 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4222 bail_never_supported!(
4223 "CREATECLUSTER attribute",
4224 "sql/create-role/#details",
4225 "Use system privileges instead."
4226 );
4227 }
4228 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4229 bail_never_supported!(
4230 "CREATEDB attribute",
4231 "sql/create-role/#details",
4232 "Use system privileges instead."
4233 );
4234 }
4235 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4236 bail_never_supported!(
4237 "CREATEROLE attribute",
4238 "sql/create-role/#details",
4239 "Use system privileges instead."
4240 );
4241 }
4242 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4243 sql_bail!("conflicting or redundant options");
4244 }
4245
4246 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4247 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4248 RoleAttribute::Password(password) => {
4249 if let Some(password) = password {
4250 planned_attributes.password = Some(password.into());
4251 } else {
4252 planned_attributes.nopassword = Some(true);
4253 }
4254 }
4255 RoleAttribute::SuperUser => {
4256 if planned_attributes.superuser == Some(false) {
4257 sql_bail!("conflicting or redundant options");
4258 }
4259 planned_attributes.superuser = Some(true);
4260 }
4261 RoleAttribute::NoSuperUser => {
4262 if planned_attributes.superuser == Some(true) {
4263 sql_bail!("conflicting or redundant options");
4264 }
4265 planned_attributes.superuser = Some(false);
4266 }
4267 RoleAttribute::Login => {
4268 if planned_attributes.login == Some(false) {
4269 sql_bail!("conflicting or redundant options");
4270 }
4271 planned_attributes.login = Some(true);
4272 }
4273 RoleAttribute::NoLogin => {
4274 if planned_attributes.login == Some(true) {
4275 sql_bail!("conflicting or redundant options");
4276 }
4277 planned_attributes.login = Some(false);
4278 }
4279 }
4280 }
4281 if planned_attributes.inherit == Some(false) {
4282 bail_unsupported!("non inherit roles");
4283 }
4284
4285 Ok(planned_attributes)
4286}
4287
4288#[derive(Debug)]
4289pub enum PlannedRoleVariable {
4290 Set { name: String, value: VariableValue },
4291 Reset { name: String },
4292}
4293
4294impl PlannedRoleVariable {
4295 pub fn name(&self) -> &str {
4296 match self {
4297 PlannedRoleVariable::Set { name, .. } => name,
4298 PlannedRoleVariable::Reset { name } => name,
4299 }
4300 }
4301}
4302
4303fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4304 let plan = match variable {
4305 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4306 name: name.to_string(),
4307 value: scl::plan_set_variable_to(value)?,
4308 },
4309 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4310 name: name.to_string(),
4311 },
4312 };
4313 Ok(plan)
4314}
4315
4316pub fn describe_create_role(
4317 _: &StatementContext,
4318 _: CreateRoleStatement,
4319) -> Result<StatementDesc, PlanError> {
4320 Ok(StatementDesc::new(None))
4321}
4322
4323pub fn plan_create_role(
4324 _: &StatementContext,
4325 CreateRoleStatement { name, options }: CreateRoleStatement,
4326) -> Result<Plan, PlanError> {
4327 let attributes = plan_role_attributes(options)?;
4328 Ok(Plan::CreateRole(CreateRolePlan {
4329 name: normalize::ident(name),
4330 attributes: attributes.into(),
4331 }))
4332}
4333
4334pub fn plan_create_network_policy(
4335 ctx: &StatementContext,
4336 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4337) -> Result<Plan, PlanError> {
4338 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4339 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4340
4341 let Some(rule_defs) = policy_options.rules else {
4342 sql_bail!("RULES must be specified when creating network policies.");
4343 };
4344
4345 let mut rules = vec![];
4346 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4347 let NetworkPolicyRuleOptionExtracted {
4348 seen: _,
4349 direction,
4350 action,
4351 address,
4352 } = options.try_into()?;
4353 let (direction, action, address) = match (direction, action, address) {
4354 (Some(direction), Some(action), Some(address)) => (
4355 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4356 NetworkPolicyRuleAction::try_from(action.as_str())?,
4357 PolicyAddress::try_from(address.as_str())?,
4358 ),
4359 (_, _, _) => {
4360 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4361 }
4362 };
4363 rules.push(NetworkPolicyRule {
4364 name: normalize::ident(name),
4365 direction,
4366 action,
4367 address,
4368 });
4369 }
4370
4371 if rules.len()
4372 > ctx
4373 .catalog
4374 .system_vars()
4375 .max_rules_per_network_policy()
4376 .try_into()?
4377 {
4378 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4379 }
4380
4381 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4382 name: normalize::ident(name),
4383 rules,
4384 }))
4385}
4386
4387pub fn plan_alter_network_policy(
4388 ctx: &StatementContext,
4389 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4390) -> Result<Plan, PlanError> {
4391 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4392
4393 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4394 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4395
4396 let Some(rule_defs) = policy_options.rules else {
4397 sql_bail!("RULES must be specified when creating network policies.");
4398 };
4399
4400 let mut rules = vec![];
4401 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4402 let NetworkPolicyRuleOptionExtracted {
4403 seen: _,
4404 direction,
4405 action,
4406 address,
4407 } = options.try_into()?;
4408
4409 let (direction, action, address) = match (direction, action, address) {
4410 (Some(direction), Some(action), Some(address)) => (
4411 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4412 NetworkPolicyRuleAction::try_from(action.as_str())?,
4413 PolicyAddress::try_from(address.as_str())?,
4414 ),
4415 (_, _, _) => {
4416 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4417 }
4418 };
4419 rules.push(NetworkPolicyRule {
4420 name: normalize::ident(name),
4421 direction,
4422 action,
4423 address,
4424 });
4425 }
4426 if rules.len()
4427 > ctx
4428 .catalog
4429 .system_vars()
4430 .max_rules_per_network_policy()
4431 .try_into()?
4432 {
4433 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4434 }
4435
4436 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4437 id: policy.id(),
4438 name: normalize::ident(name),
4439 rules,
4440 }))
4441}
4442
4443pub fn describe_create_cluster(
4444 _: &StatementContext,
4445 _: CreateClusterStatement<Aug>,
4446) -> Result<StatementDesc, PlanError> {
4447 Ok(StatementDesc::new(None))
4448}
4449
4450generate_extracted_config!(
4456 ClusterOption,
4457 (AvailabilityZones, Vec<String>),
4458 (Disk, bool),
4459 (IntrospectionDebugging, bool),
4460 (IntrospectionInterval, OptionalDuration),
4461 (Managed, bool),
4462 (Replicas, Vec<ReplicaDefinition<Aug>>),
4463 (ReplicationFactor, u32),
4464 (Size, String),
4465 (Schedule, ClusterScheduleOptionValue),
4466 (WorkloadClass, OptionalString)
4467);
4468
4469generate_extracted_config!(
4470 NetworkPolicyOption,
4471 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4472);
4473
4474generate_extracted_config!(
4475 NetworkPolicyRuleOption,
4476 (Direction, String),
4477 (Action, String),
4478 (Address, String)
4479);
4480
4481generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4482
4483generate_extracted_config!(
4484 ClusterAlterUntilReadyOption,
4485 (Timeout, Duration),
4486 (OnTimeout, String)
4487);
4488
4489generate_extracted_config!(
4490 ClusterFeature,
4491 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4492 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4493 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4494 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4495 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4496 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4497 (
4498 EnableProjectionPushdownAfterRelationCse,
4499 Option<bool>,
4500 Default(None)
4501 )
4502);
4503
4504pub fn plan_create_cluster(
4508 scx: &StatementContext,
4509 stmt: CreateClusterStatement<Aug>,
4510) -> Result<Plan, PlanError> {
4511 let plan = plan_create_cluster_inner(scx, stmt)?;
4512
4513 if let CreateClusterVariant::Managed(_) = &plan.variant {
4515 let stmt = unplan_create_cluster(scx, plan.clone())
4516 .map_err(|e| PlanError::Replan(e.to_string()))?;
4517 let create_sql = stmt.to_ast_string_stable();
4518 let stmt = parse::parse(&create_sql)
4519 .map_err(|e| PlanError::Replan(e.to_string()))?
4520 .into_element()
4521 .ast;
4522 let (stmt, _resolved_ids) =
4523 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4524 let stmt = match stmt {
4525 Statement::CreateCluster(stmt) => stmt,
4526 stmt => {
4527 return Err(PlanError::Replan(format!(
4528 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4529 )));
4530 }
4531 };
4532 let replan =
4533 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4534 if plan != replan {
4535 return Err(PlanError::Replan(format!(
4536 "replan does not match: plan={plan:?}, replan={replan:?}"
4537 )));
4538 }
4539 }
4540
4541 Ok(Plan::CreateCluster(plan))
4542}
4543
4544pub fn plan_create_cluster_inner(
4545 scx: &StatementContext,
4546 CreateClusterStatement {
4547 name,
4548 options,
4549 features,
4550 }: CreateClusterStatement<Aug>,
4551) -> Result<CreateClusterPlan, PlanError> {
4552 let ClusterOptionExtracted {
4553 availability_zones,
4554 introspection_debugging,
4555 introspection_interval,
4556 managed,
4557 replicas,
4558 replication_factor,
4559 seen: _,
4560 size,
4561 disk: disk_in,
4562 schedule,
4563 workload_class,
4564 }: ClusterOptionExtracted = options.try_into()?;
4565
4566 let managed = managed.unwrap_or_else(|| replicas.is_none());
4567
4568 if !scx.catalog.active_role_id().is_system() {
4569 if !features.is_empty() {
4570 sql_bail!("FEATURES not supported for non-system users");
4571 }
4572 if workload_class.is_some() {
4573 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4574 }
4575 }
4576
4577 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4578 let workload_class = workload_class.and_then(|v| v.0);
4579
4580 if managed {
4581 if replicas.is_some() {
4582 sql_bail!("REPLICAS not supported for managed clusters");
4583 }
4584 let Some(size) = size else {
4585 sql_bail!("SIZE must be specified for managed clusters");
4586 };
4587
4588 let mut disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4589 if scx.catalog.is_cluster_size_cc(&size) {
4596 if disk_in == Some(false) {
4597 sql_bail!(
4598 "DISK option disabled is not supported for non-legacy cluster sizes because disk is always enabled"
4599 );
4600 }
4601 disk_default = true;
4602 }
4603 if matches!(disk_in, Some(disk) if disk != disk_default) {
4606 scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4607 }
4608 let disk = disk_in.unwrap_or(disk_default);
4609
4610 let compute = plan_compute_replica_config(
4611 introspection_interval,
4612 introspection_debugging.unwrap_or(false),
4613 )?;
4614
4615 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4616 replication_factor.unwrap_or_else(|| {
4617 scx.catalog
4618 .system_vars()
4619 .default_cluster_replication_factor()
4620 })
4621 } else {
4622 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4623 if replication_factor.is_some() {
4624 sql_bail!(
4625 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4626 );
4627 }
4628 0
4632 };
4633 let availability_zones = availability_zones.unwrap_or_default();
4634
4635 if !availability_zones.is_empty() {
4636 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4637 }
4638
4639 let ClusterFeatureExtracted {
4641 reoptimize_imported_views,
4642 enable_eager_delta_joins,
4643 enable_new_outer_join_lowering,
4644 enable_variadic_left_join_lowering,
4645 enable_letrec_fixpoint_analysis,
4646 enable_join_prioritize_arranged,
4647 enable_projection_pushdown_after_relation_cse,
4648 seen: _,
4649 } = ClusterFeatureExtracted::try_from(features)?;
4650 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4651 reoptimize_imported_views,
4652 enable_eager_delta_joins,
4653 enable_new_outer_join_lowering,
4654 enable_variadic_left_join_lowering,
4655 enable_letrec_fixpoint_analysis,
4656 enable_join_prioritize_arranged,
4657 enable_projection_pushdown_after_relation_cse,
4658 ..Default::default()
4659 };
4660
4661 let schedule = plan_cluster_schedule(schedule)?;
4662
4663 Ok(CreateClusterPlan {
4664 name: normalize::ident(name),
4665 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4666 replication_factor,
4667 size,
4668 availability_zones,
4669 compute,
4670 disk,
4671 optimizer_feature_overrides,
4672 schedule,
4673 }),
4674 workload_class,
4675 })
4676 } else {
4677 let Some(replica_defs) = replicas else {
4678 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4679 };
4680 if availability_zones.is_some() {
4681 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4682 }
4683 if replication_factor.is_some() {
4684 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4685 }
4686 if introspection_debugging.is_some() {
4687 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4688 }
4689 if introspection_interval.is_some() {
4690 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4691 }
4692 if size.is_some() {
4693 sql_bail!("SIZE not supported for unmanaged clusters");
4694 }
4695 if disk_in.is_some() {
4696 sql_bail!("DISK not supported for unmanaged clusters");
4697 }
4698 if !features.is_empty() {
4699 sql_bail!("FEATURES not supported for unmanaged clusters");
4700 }
4701 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4702 sql_bail!(
4703 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4704 );
4705 }
4706
4707 let mut replicas = vec![];
4708 for ReplicaDefinition { name, options } in replica_defs {
4709 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4710 }
4711
4712 Ok(CreateClusterPlan {
4713 name: normalize::ident(name),
4714 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4715 workload_class,
4716 })
4717 }
4718}
4719
4720pub fn unplan_create_cluster(
4724 scx: &StatementContext,
4725 CreateClusterPlan {
4726 name,
4727 variant,
4728 workload_class,
4729 }: CreateClusterPlan,
4730) -> Result<CreateClusterStatement<Aug>, PlanError> {
4731 match variant {
4732 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4733 replication_factor,
4734 size,
4735 availability_zones,
4736 compute,
4737 disk,
4738 optimizer_feature_overrides,
4739 schedule,
4740 }) => {
4741 let schedule = unplan_cluster_schedule(schedule);
4742 let OptimizerFeatureOverrides {
4743 enable_consolidate_after_union_negate: _,
4744 enable_reduce_mfp_fusion: _,
4745 enable_cardinality_estimates: _,
4746 persist_fast_path_limit: _,
4747 reoptimize_imported_views,
4748 enable_eager_delta_joins,
4749 enable_new_outer_join_lowering,
4750 enable_variadic_left_join_lowering,
4751 enable_letrec_fixpoint_analysis,
4752 enable_reduce_reduction: _,
4753 enable_join_prioritize_arranged,
4754 enable_projection_pushdown_after_relation_cse,
4755 enable_less_reduce_in_eqprop: _,
4756 enable_dequadratic_eqprop_map: _,
4757 } = optimizer_feature_overrides;
4758 let features_extracted = ClusterFeatureExtracted {
4760 seen: Default::default(),
4762 reoptimize_imported_views,
4763 enable_eager_delta_joins,
4764 enable_new_outer_join_lowering,
4765 enable_variadic_left_join_lowering,
4766 enable_letrec_fixpoint_analysis,
4767 enable_join_prioritize_arranged,
4768 enable_projection_pushdown_after_relation_cse,
4769 };
4770 let features = features_extracted.into_values(scx.catalog);
4771 let availability_zones = if availability_zones.is_empty() {
4772 None
4773 } else {
4774 Some(availability_zones)
4775 };
4776 let (introspection_interval, introspection_debugging) =
4777 unplan_compute_replica_config(compute);
4778 let replication_factor = match &schedule {
4781 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4782 ClusterScheduleOptionValue::Refresh { .. } => {
4783 assert!(
4784 replication_factor <= 1,
4785 "replication factor, {replication_factor:?}, must be <= 1"
4786 );
4787 None
4788 }
4789 };
4790 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4791 let options_extracted = ClusterOptionExtracted {
4792 seen: Default::default(),
4794 availability_zones,
4795 disk: Some(disk),
4796 introspection_debugging: Some(introspection_debugging),
4797 introspection_interval,
4798 managed: Some(true),
4799 replicas: None,
4800 replication_factor,
4801 size: Some(size),
4802 schedule: Some(schedule),
4803 workload_class,
4804 };
4805 let options = options_extracted.into_values(scx.catalog);
4806 let name = Ident::new_unchecked(name);
4807 Ok(CreateClusterStatement {
4808 name,
4809 options,
4810 features,
4811 })
4812 }
4813 CreateClusterVariant::Unmanaged(_) => {
4814 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4815 }
4816 }
4817}
4818
4819generate_extracted_config!(
4820 ReplicaOption,
4821 (AvailabilityZone, String),
4822 (BilledAs, String),
4823 (ComputeAddresses, Vec<String>),
4824 (ComputectlAddresses, Vec<String>),
4825 (Disk, bool),
4826 (Internal, bool, Default(false)),
4827 (IntrospectionDebugging, bool, Default(false)),
4828 (IntrospectionInterval, OptionalDuration),
4829 (Size, String),
4830 (StorageAddresses, Vec<String>),
4831 (StoragectlAddresses, Vec<String>),
4832 (Workers, u16)
4833);
4834
4835fn plan_replica_config(
4836 scx: &StatementContext,
4837 options: Vec<ReplicaOption<Aug>>,
4838) -> Result<ReplicaConfig, PlanError> {
4839 let ReplicaOptionExtracted {
4840 availability_zone,
4841 billed_as,
4842 compute_addresses,
4843 computectl_addresses,
4844 disk: disk_in,
4845 internal,
4846 introspection_debugging,
4847 introspection_interval,
4848 size,
4849 storage_addresses,
4850 storagectl_addresses,
4851 workers,
4852 ..
4853 }: ReplicaOptionExtracted = options.try_into()?;
4854
4855 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4856
4857 if disk_in.is_some() {
4858 scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4859 }
4860
4861 match (
4862 size,
4863 availability_zone,
4864 billed_as,
4865 storagectl_addresses,
4866 storage_addresses,
4867 computectl_addresses,
4868 compute_addresses,
4869 workers,
4870 ) {
4871 (None, _, None, None, None, None, None, None) => {
4873 sql_bail!("SIZE option must be specified");
4876 }
4877 (Some(size), availability_zone, billed_as, None, None, None, None, None) => {
4878 let disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4879 let mut disk = disk_in.unwrap_or(disk_default);
4880
4881 if scx.catalog.is_cluster_size_cc(&size) {
4889 if disk_in.is_some() {
4890 sql_bail!(
4891 "DISK option not supported for non-legacy cluster sizes because disk is always enabled"
4892 );
4893 }
4894 disk = true;
4895 }
4896
4897 Ok(ReplicaConfig::Orchestrated {
4898 size,
4899 availability_zone,
4900 compute,
4901 disk,
4902 billed_as,
4903 internal,
4904 })
4905 }
4906
4907 (
4908 None,
4909 None,
4910 None,
4911 storagectl_addresses,
4912 storage_addresses,
4913 computectl_addresses,
4914 compute_addresses,
4915 workers,
4916 ) => {
4917 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4918
4919 let Some(storagectl_addrs) = storagectl_addresses else {
4923 sql_bail!("missing STORAGECTL ADDRESSES option");
4924 };
4925 let Some(storage_addrs) = storage_addresses else {
4926 sql_bail!("missing STORAGE ADDRESSES option");
4927 };
4928 let Some(computectl_addrs) = computectl_addresses else {
4929 sql_bail!("missing COMPUTECTL ADDRESSES option");
4930 };
4931 let Some(compute_addrs) = compute_addresses else {
4932 sql_bail!("missing COMPUTE ADDRESSES option");
4933 };
4934 let workers = workers.unwrap_or(1);
4935
4936 if computectl_addrs.len() != compute_addrs.len() {
4937 sql_bail!("COMPUTECTL ADDRESSES and COMPUTE ADDRESSES must have the same length");
4938 }
4939 if storagectl_addrs.len() != storage_addrs.len() {
4940 sql_bail!("STORAGECTL ADDRESSES and STORAGE ADDRESSES must have the same length");
4941 }
4942 if storagectl_addrs.len() != computectl_addrs.len() {
4943 sql_bail!(
4944 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4945 );
4946 }
4947
4948 if workers == 0 {
4949 sql_bail!("WORKERS must be greater than 0");
4950 }
4951
4952 if disk_in.is_some() {
4953 sql_bail!("DISK can't be specified for unorchestrated clusters");
4954 }
4955
4956 Ok(ReplicaConfig::Unorchestrated {
4957 storagectl_addrs,
4958 storage_addrs,
4959 computectl_addrs,
4960 compute_addrs,
4961 workers: workers.into(),
4962 compute,
4963 })
4964 }
4965 _ => {
4966 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4969 }
4970 }
4971}
4972
4973fn plan_compute_replica_config(
4977 introspection_interval: Option<OptionalDuration>,
4978 introspection_debugging: bool,
4979) -> Result<ComputeReplicaConfig, PlanError> {
4980 let introspection_interval = introspection_interval
4981 .map(|OptionalDuration(i)| i)
4982 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
4983 let introspection = match introspection_interval {
4984 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
4985 interval,
4986 debugging: introspection_debugging,
4987 }),
4988 None if introspection_debugging => {
4989 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
4990 }
4991 None => None,
4992 };
4993 let compute = ComputeReplicaConfig { introspection };
4994 Ok(compute)
4995}
4996
4997fn unplan_compute_replica_config(
5001 compute_replica_config: ComputeReplicaConfig,
5002) -> (Option<OptionalDuration>, bool) {
5003 match compute_replica_config.introspection {
5004 Some(ComputeReplicaIntrospectionConfig {
5005 debugging,
5006 interval,
5007 }) => (Some(OptionalDuration(Some(interval))), debugging),
5008 None => (Some(OptionalDuration(None)), false),
5009 }
5010}
5011
5012fn plan_cluster_schedule(
5016 schedule: ClusterScheduleOptionValue,
5017) -> Result<ClusterSchedule, PlanError> {
5018 Ok(match schedule {
5019 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5020 ClusterScheduleOptionValue::Refresh {
5022 hydration_time_estimate: None,
5023 } => ClusterSchedule::Refresh {
5024 hydration_time_estimate: Duration::from_millis(0),
5025 },
5026 ClusterScheduleOptionValue::Refresh {
5028 hydration_time_estimate: Some(interval_value),
5029 } => {
5030 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5031 if interval.as_microseconds() < 0 {
5032 sql_bail!(
5033 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5034 interval
5035 );
5036 }
5037 if interval.months != 0 {
5038 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5042 }
5043 let duration = interval.duration()?;
5044 if u64::try_from(duration.as_millis()).is_err()
5045 || Interval::from_duration(&duration).is_err()
5046 {
5047 sql_bail!("HYDRATION TIME ESTIMATE too large");
5048 }
5049 ClusterSchedule::Refresh {
5050 hydration_time_estimate: duration,
5051 }
5052 }
5053 })
5054}
5055
5056fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5060 match schedule {
5061 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5062 ClusterSchedule::Refresh {
5063 hydration_time_estimate,
5064 } => {
5065 let interval = Interval::from_duration(&hydration_time_estimate)
5066 .expect("planning ensured that this is convertible back to Interval");
5067 let interval_value = literal::unplan_interval(&interval);
5068 ClusterScheduleOptionValue::Refresh {
5069 hydration_time_estimate: Some(interval_value),
5070 }
5071 }
5072 }
5073}
5074
5075pub fn describe_create_cluster_replica(
5076 _: &StatementContext,
5077 _: CreateClusterReplicaStatement<Aug>,
5078) -> Result<StatementDesc, PlanError> {
5079 Ok(StatementDesc::new(None))
5080}
5081
5082pub fn plan_create_cluster_replica(
5083 scx: &StatementContext,
5084 CreateClusterReplicaStatement {
5085 definition: ReplicaDefinition { name, options },
5086 of_cluster,
5087 }: CreateClusterReplicaStatement<Aug>,
5088) -> Result<Plan, PlanError> {
5089 let cluster = scx
5090 .catalog
5091 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5092 let current_replica_count = cluster.replica_ids().iter().count();
5093 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5094 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5095 return Err(PlanError::CreateReplicaFailStorageObjects {
5096 current_replica_count,
5097 internal_replica_count,
5098 hypothetical_replica_count: current_replica_count + 1,
5099 });
5100 }
5101
5102 let config = plan_replica_config(scx, options)?;
5103
5104 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5105 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5106 return Err(PlanError::MangedReplicaName(name.into_string()));
5107 }
5108 } else {
5109 ensure_cluster_is_not_managed(scx, cluster.id())?;
5110 }
5111
5112 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5113 name: normalize::ident(name),
5114 cluster_id: cluster.id(),
5115 config,
5116 }))
5117}
5118
5119pub fn describe_create_secret(
5120 _: &StatementContext,
5121 _: CreateSecretStatement<Aug>,
5122) -> Result<StatementDesc, PlanError> {
5123 Ok(StatementDesc::new(None))
5124}
5125
5126pub fn plan_create_secret(
5127 scx: &StatementContext,
5128 stmt: CreateSecretStatement<Aug>,
5129) -> Result<Plan, PlanError> {
5130 let CreateSecretStatement {
5131 name,
5132 if_not_exists,
5133 value,
5134 } = &stmt;
5135
5136 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5137 let mut create_sql_statement = stmt.clone();
5138 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5139 let create_sql =
5140 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5141 let secret_as = query::plan_secret_as(scx, value.clone())?;
5142
5143 let secret = Secret {
5144 create_sql,
5145 secret_as,
5146 };
5147
5148 Ok(Plan::CreateSecret(CreateSecretPlan {
5149 name,
5150 secret,
5151 if_not_exists: *if_not_exists,
5152 }))
5153}
5154
5155pub fn describe_create_connection(
5156 _: &StatementContext,
5157 _: CreateConnectionStatement<Aug>,
5158) -> Result<StatementDesc, PlanError> {
5159 Ok(StatementDesc::new(None))
5160}
5161
5162generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5163
5164pub fn plan_create_connection(
5165 scx: &StatementContext,
5166 mut stmt: CreateConnectionStatement<Aug>,
5167) -> Result<Plan, PlanError> {
5168 let CreateConnectionStatement {
5169 name,
5170 connection_type,
5171 values,
5172 if_not_exists,
5173 with_options,
5174 } = stmt.clone();
5175 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5176 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5177 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5178
5179 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5180 if options.validate.is_some() {
5181 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5182 }
5183 let validate = match options.validate {
5184 Some(val) => val,
5185 None => {
5186 scx.catalog
5187 .system_vars()
5188 .enable_default_connection_validation()
5189 && details.to_connection().validate_by_default()
5190 }
5191 };
5192
5193 let full_name = scx.catalog.resolve_full_name(&name);
5195 let partial_name = PartialItemName::from(full_name.clone());
5196 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5197 return Err(PlanError::ItemAlreadyExists {
5198 name: full_name.to_string(),
5199 item_type: item.item_type(),
5200 });
5201 }
5202
5203 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5206 stmt.values.retain(|v| {
5207 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5208 });
5209 stmt.values.push(ConnectionOption {
5210 name: ConnectionOptionName::PublicKey1,
5211 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5212 });
5213 stmt.values.push(ConnectionOption {
5214 name: ConnectionOptionName::PublicKey2,
5215 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5216 });
5217 }
5218 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5219
5220 let plan = CreateConnectionPlan {
5221 name,
5222 if_not_exists,
5223 connection: crate::plan::Connection {
5224 create_sql,
5225 details,
5226 },
5227 validate,
5228 };
5229 Ok(Plan::CreateConnection(plan))
5230}
5231
5232fn plan_drop_database(
5233 scx: &StatementContext,
5234 if_exists: bool,
5235 name: &UnresolvedDatabaseName,
5236 cascade: bool,
5237) -> Result<Option<DatabaseId>, PlanError> {
5238 Ok(match resolve_database(scx, name, if_exists)? {
5239 Some(database) => {
5240 if !cascade && database.has_schemas() {
5241 sql_bail!(
5242 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5243 name,
5244 );
5245 }
5246 Some(database.id())
5247 }
5248 None => None,
5249 })
5250}
5251
5252pub fn describe_drop_objects(
5253 _: &StatementContext,
5254 _: DropObjectsStatement,
5255) -> Result<StatementDesc, PlanError> {
5256 Ok(StatementDesc::new(None))
5257}
5258
5259pub fn plan_drop_objects(
5260 scx: &mut StatementContext,
5261 DropObjectsStatement {
5262 object_type,
5263 if_exists,
5264 names,
5265 cascade,
5266 }: DropObjectsStatement,
5267) -> Result<Plan, PlanError> {
5268 assert_ne!(
5269 object_type,
5270 mz_sql_parser::ast::ObjectType::Func,
5271 "rejected in parser"
5272 );
5273 let object_type = object_type.into();
5274
5275 let mut referenced_ids = Vec::new();
5276 for name in names {
5277 let id = match &name {
5278 UnresolvedObjectName::Cluster(name) => {
5279 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5280 }
5281 UnresolvedObjectName::ClusterReplica(name) => {
5282 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5283 }
5284 UnresolvedObjectName::Database(name) => {
5285 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5286 }
5287 UnresolvedObjectName::Schema(name) => {
5288 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5289 }
5290 UnresolvedObjectName::Role(name) => {
5291 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5292 }
5293 UnresolvedObjectName::Item(name) => {
5294 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5295 .map(ObjectId::Item)
5296 }
5297 UnresolvedObjectName::NetworkPolicy(name) => {
5298 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5299 }
5300 };
5301 match id {
5302 Some(id) => referenced_ids.push(id),
5303 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5304 name: name.to_ast_string_simple(),
5305 object_type,
5306 }),
5307 }
5308 }
5309 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5310
5311 Ok(Plan::DropObjects(DropObjectsPlan {
5312 referenced_ids,
5313 drop_ids,
5314 object_type,
5315 }))
5316}
5317
5318fn plan_drop_schema(
5319 scx: &StatementContext,
5320 if_exists: bool,
5321 name: &UnresolvedSchemaName,
5322 cascade: bool,
5323) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5324 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5325 Some((database_spec, schema_spec)) => {
5326 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5327 sql_bail!(
5328 "cannot drop schema {name} because it is required by the database system",
5329 );
5330 }
5331 if let SchemaSpecifier::Temporary = schema_spec {
5332 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5333 }
5334 let schema = scx.get_schema(&database_spec, &schema_spec);
5335 if !cascade && schema.has_items() {
5336 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5337 sql_bail!(
5338 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5339 full_schema_name
5340 );
5341 }
5342 Some((database_spec, schema_spec))
5343 }
5344 None => None,
5345 })
5346}
5347
5348fn plan_drop_role(
5349 scx: &StatementContext,
5350 if_exists: bool,
5351 name: &Ident,
5352) -> Result<Option<RoleId>, PlanError> {
5353 match scx.catalog.resolve_role(name.as_str()) {
5354 Ok(role) => {
5355 let id = role.id();
5356 if &id == scx.catalog.active_role_id() {
5357 sql_bail!("current role cannot be dropped");
5358 }
5359 for role in scx.catalog.get_roles() {
5360 for (member_id, grantor_id) in role.membership() {
5361 if &id == grantor_id {
5362 let member_role = scx.catalog.get_role(member_id);
5363 sql_bail!(
5364 "cannot drop role {}: still depended up by membership of role {} in role {}",
5365 name.as_str(),
5366 role.name(),
5367 member_role.name()
5368 );
5369 }
5370 }
5371 }
5372 Ok(Some(role.id()))
5373 }
5374 Err(_) if if_exists => Ok(None),
5375 Err(e) => Err(e.into()),
5376 }
5377}
5378
5379fn plan_drop_cluster(
5380 scx: &StatementContext,
5381 if_exists: bool,
5382 name: &Ident,
5383 cascade: bool,
5384) -> Result<Option<ClusterId>, PlanError> {
5385 Ok(match resolve_cluster(scx, name, if_exists)? {
5386 Some(cluster) => {
5387 if !cascade && !cluster.bound_objects().is_empty() {
5388 return Err(PlanError::DependentObjectsStillExist {
5389 object_type: "cluster".to_string(),
5390 object_name: cluster.name().to_string(),
5391 dependents: Vec::new(),
5392 });
5393 }
5394 Some(cluster.id())
5395 }
5396 None => None,
5397 })
5398}
5399
5400fn plan_drop_network_policy(
5401 scx: &StatementContext,
5402 if_exists: bool,
5403 name: &Ident,
5404) -> Result<Option<NetworkPolicyId>, PlanError> {
5405 match scx.catalog.resolve_network_policy(name.as_str()) {
5406 Ok(policy) => {
5407 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5410 Err(PlanError::NetworkPolicyInUse)
5411 } else {
5412 Ok(Some(policy.id()))
5413 }
5414 }
5415 Err(_) if if_exists => Ok(None),
5416 Err(e) => Err(e.into()),
5417 }
5418}
5419
5420fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5423 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5425 false
5426 } else {
5427 cluster.bound_objects().iter().any(|id| {
5429 let item = scx.catalog.get_item(id);
5430 matches!(
5431 item.item_type(),
5432 CatalogItemType::Sink | CatalogItemType::Source
5433 )
5434 })
5435 }
5436}
5437
5438fn plan_drop_cluster_replica(
5439 scx: &StatementContext,
5440 if_exists: bool,
5441 name: &QualifiedReplica,
5442) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5443 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5444 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5445}
5446
5447fn plan_drop_item(
5449 scx: &StatementContext,
5450 object_type: ObjectType,
5451 if_exists: bool,
5452 name: UnresolvedItemName,
5453 cascade: bool,
5454) -> Result<Option<CatalogItemId>, PlanError> {
5455 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5456 Ok(r) => r,
5457 Err(PlanError::MismatchedObjectType {
5459 name,
5460 is_type: ObjectType::MaterializedView,
5461 expected_type: ObjectType::View,
5462 }) => {
5463 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5464 }
5465 e => e?,
5466 };
5467
5468 Ok(match resolved {
5469 Some(catalog_item) => {
5470 if catalog_item.id().is_system() {
5471 sql_bail!(
5472 "cannot drop {} {} because it is required by the database system",
5473 catalog_item.item_type(),
5474 scx.catalog.minimal_qualification(catalog_item.name()),
5475 );
5476 }
5477
5478 if !cascade {
5479 for id in catalog_item.used_by() {
5480 let dep = scx.catalog.get_item(id);
5481 if dependency_prevents_drop(object_type, dep) {
5482 return Err(PlanError::DependentObjectsStillExist {
5483 object_type: catalog_item.item_type().to_string(),
5484 object_name: scx
5485 .catalog
5486 .minimal_qualification(catalog_item.name())
5487 .to_string(),
5488 dependents: vec![(
5489 dep.item_type().to_string(),
5490 scx.catalog.minimal_qualification(dep.name()).to_string(),
5491 )],
5492 });
5493 }
5494 }
5495 }
5498 Some(catalog_item.id())
5499 }
5500 None => None,
5501 })
5502}
5503
5504fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5506 match object_type {
5507 ObjectType::Type => true,
5508 _ => match dep.item_type() {
5509 CatalogItemType::Func
5510 | CatalogItemType::Table
5511 | CatalogItemType::Source
5512 | CatalogItemType::View
5513 | CatalogItemType::MaterializedView
5514 | CatalogItemType::Sink
5515 | CatalogItemType::Type
5516 | CatalogItemType::Secret
5517 | CatalogItemType::Connection
5518 | CatalogItemType::ContinualTask => true,
5519 CatalogItemType::Index => false,
5520 },
5521 }
5522}
5523
5524pub fn describe_alter_index_options(
5525 _: &StatementContext,
5526 _: AlterIndexStatement<Aug>,
5527) -> Result<StatementDesc, PlanError> {
5528 Ok(StatementDesc::new(None))
5529}
5530
5531pub fn describe_drop_owned(
5532 _: &StatementContext,
5533 _: DropOwnedStatement<Aug>,
5534) -> Result<StatementDesc, PlanError> {
5535 Ok(StatementDesc::new(None))
5536}
5537
5538pub fn plan_drop_owned(
5539 scx: &StatementContext,
5540 drop: DropOwnedStatement<Aug>,
5541) -> Result<Plan, PlanError> {
5542 let cascade = drop.cascade();
5543 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5544 let mut drop_ids = Vec::new();
5545 let mut privilege_revokes = Vec::new();
5546 let mut default_privilege_revokes = Vec::new();
5547
5548 fn update_privilege_revokes(
5549 object_id: SystemObjectId,
5550 privileges: &PrivilegeMap,
5551 role_ids: &BTreeSet<RoleId>,
5552 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5553 ) {
5554 privilege_revokes.extend(iter::zip(
5555 iter::repeat(object_id),
5556 privileges
5557 .all_values()
5558 .filter(|privilege| role_ids.contains(&privilege.grantee))
5559 .cloned(),
5560 ));
5561 }
5562
5563 for replica in scx.catalog.get_cluster_replicas() {
5565 if role_ids.contains(&replica.owner_id()) {
5566 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5567 }
5568 }
5569
5570 for cluster in scx.catalog.get_clusters() {
5572 if role_ids.contains(&cluster.owner_id()) {
5573 if !cascade {
5575 let non_owned_bound_objects: Vec<_> = cluster
5576 .bound_objects()
5577 .into_iter()
5578 .map(|item_id| scx.catalog.get_item(item_id))
5579 .filter(|item| !role_ids.contains(&item.owner_id()))
5580 .collect();
5581 if !non_owned_bound_objects.is_empty() {
5582 let names: Vec<_> = non_owned_bound_objects
5583 .into_iter()
5584 .map(|item| {
5585 (
5586 item.item_type().to_string(),
5587 scx.catalog.resolve_full_name(item.name()).to_string(),
5588 )
5589 })
5590 .collect();
5591 return Err(PlanError::DependentObjectsStillExist {
5592 object_type: "cluster".to_string(),
5593 object_name: cluster.name().to_string(),
5594 dependents: names,
5595 });
5596 }
5597 }
5598 drop_ids.push(cluster.id().into());
5599 }
5600 update_privilege_revokes(
5601 SystemObjectId::Object(cluster.id().into()),
5602 cluster.privileges(),
5603 &role_ids,
5604 &mut privilege_revokes,
5605 );
5606 }
5607
5608 for item in scx.catalog.get_items() {
5610 if role_ids.contains(&item.owner_id()) {
5611 if !cascade {
5612 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5614 let non_owned_dependencies: Vec<_> = used_by
5615 .into_iter()
5616 .map(|item_id| scx.catalog.get_item(item_id))
5617 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5618 .filter(|item| !role_ids.contains(&item.owner_id()))
5619 .collect();
5620 if !non_owned_dependencies.is_empty() {
5621 let names: Vec<_> = non_owned_dependencies
5622 .into_iter()
5623 .map(|item| {
5624 let item_typ = item.item_type().to_string();
5625 let item_name =
5626 scx.catalog.resolve_full_name(item.name()).to_string();
5627 (item_typ, item_name)
5628 })
5629 .collect();
5630 Err(PlanError::DependentObjectsStillExist {
5631 object_type: item.item_type().to_string(),
5632 object_name: scx
5633 .catalog
5634 .resolve_full_name(item.name())
5635 .to_string()
5636 .to_string(),
5637 dependents: names,
5638 })
5639 } else {
5640 Ok(())
5641 }
5642 };
5643
5644 if let Some(id) = item.progress_id() {
5647 let progress_item = scx.catalog.get_item(&id);
5648 check_if_dependents_exist(progress_item.used_by())?;
5649 }
5650 check_if_dependents_exist(item.used_by())?;
5651 }
5652 drop_ids.push(item.id().into());
5653 }
5654 update_privilege_revokes(
5655 SystemObjectId::Object(item.id().into()),
5656 item.privileges(),
5657 &role_ids,
5658 &mut privilege_revokes,
5659 );
5660 }
5661
5662 for schema in scx.catalog.get_schemas() {
5664 if !schema.id().is_temporary() {
5665 if role_ids.contains(&schema.owner_id()) {
5666 if !cascade {
5667 let non_owned_dependencies: Vec<_> = schema
5668 .item_ids()
5669 .map(|item_id| scx.catalog.get_item(&item_id))
5670 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5671 .filter(|item| !role_ids.contains(&item.owner_id()))
5672 .collect();
5673 if !non_owned_dependencies.is_empty() {
5674 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5675 sql_bail!(
5676 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5677 full_schema_name.to_string().quoted()
5678 );
5679 }
5680 }
5681 drop_ids.push((*schema.database(), *schema.id()).into())
5682 }
5683 update_privilege_revokes(
5684 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5685 schema.privileges(),
5686 &role_ids,
5687 &mut privilege_revokes,
5688 );
5689 }
5690 }
5691
5692 for database in scx.catalog.get_databases() {
5694 if role_ids.contains(&database.owner_id()) {
5695 if !cascade {
5696 let non_owned_schemas: Vec<_> = database
5697 .schemas()
5698 .into_iter()
5699 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5700 .collect();
5701 if !non_owned_schemas.is_empty() {
5702 sql_bail!(
5703 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5704 database.name().quoted(),
5705 );
5706 }
5707 }
5708 drop_ids.push(database.id().into());
5709 }
5710 update_privilege_revokes(
5711 SystemObjectId::Object(database.id().into()),
5712 database.privileges(),
5713 &role_ids,
5714 &mut privilege_revokes,
5715 );
5716 }
5717
5718 update_privilege_revokes(
5720 SystemObjectId::System,
5721 scx.catalog.get_system_privileges(),
5722 &role_ids,
5723 &mut privilege_revokes,
5724 );
5725
5726 for (default_privilege_object, default_privilege_acl_items) in
5727 scx.catalog.get_default_privileges()
5728 {
5729 for default_privilege_acl_item in default_privilege_acl_items {
5730 if role_ids.contains(&default_privilege_object.role_id)
5731 || role_ids.contains(&default_privilege_acl_item.grantee)
5732 {
5733 default_privilege_revokes.push((
5734 default_privilege_object.clone(),
5735 default_privilege_acl_item.clone(),
5736 ));
5737 }
5738 }
5739 }
5740
5741 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5742
5743 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5744 if !system_ids.is_empty() {
5745 let mut owners = system_ids
5746 .into_iter()
5747 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5748 .collect::<BTreeSet<_>>()
5749 .into_iter()
5750 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5751 sql_bail!(
5752 "cannot drop objects owned by role {} because they are required by the database system",
5753 owners.join(", "),
5754 );
5755 }
5756
5757 Ok(Plan::DropOwned(DropOwnedPlan {
5758 role_ids: role_ids.into_iter().collect(),
5759 drop_ids,
5760 privilege_revokes,
5761 default_privilege_revokes,
5762 }))
5763}
5764
5765fn plan_retain_history_option(
5766 scx: &StatementContext,
5767 retain_history: Option<OptionalDuration>,
5768) -> Result<Option<CompactionWindow>, PlanError> {
5769 if let Some(OptionalDuration(lcw)) = retain_history {
5770 Ok(Some(plan_retain_history(scx, lcw)?))
5771 } else {
5772 Ok(None)
5773 }
5774}
5775
5776fn plan_retain_history(
5782 scx: &StatementContext,
5783 lcw: Option<Duration>,
5784) -> Result<CompactionWindow, PlanError> {
5785 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5786 match lcw {
5787 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5792 option_name: "RETAIN HISTORY".to_string(),
5793 err: Box::new(PlanError::Unstructured(
5794 "internal error: unexpectedly zero".to_string(),
5795 )),
5796 }),
5797 Some(duration) => {
5798 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5801 && scx
5802 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5803 .is_err()
5804 {
5805 return Err(PlanError::RetainHistoryLow {
5806 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5807 });
5808 }
5809 Ok(duration.try_into()?)
5810 }
5811 None => {
5814 if scx
5815 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5816 .is_err()
5817 {
5818 Err(PlanError::RetainHistoryRequired)
5819 } else {
5820 Ok(CompactionWindow::DisableCompaction)
5821 }
5822 }
5823 }
5824}
5825
5826generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5827
5828fn plan_index_options(
5829 scx: &StatementContext,
5830 with_opts: Vec<IndexOption<Aug>>,
5831) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5832 if !with_opts.is_empty() {
5833 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5835 }
5836
5837 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5838
5839 let mut out = Vec::with_capacity(1);
5840 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5841 out.push(crate::plan::IndexOption::RetainHistory(cw));
5842 }
5843 Ok(out)
5844}
5845
5846generate_extracted_config!(
5847 TableOption,
5848 (PartitionBy, Vec<Ident>),
5849 (RetainHistory, OptionalDuration),
5850 (RedactedTest, String)
5851);
5852
5853fn plan_table_options(
5854 scx: &StatementContext,
5855 desc: &RelationDesc,
5856 with_opts: Vec<TableOption<Aug>>,
5857) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5858 let TableOptionExtracted {
5859 partition_by,
5860 retain_history,
5861 redacted_test,
5862 ..
5863 }: TableOptionExtracted = with_opts.try_into()?;
5864
5865 if let Some(partition_by) = partition_by {
5866 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5867 check_partition_by(desc, partition_by)?;
5868 }
5869
5870 if redacted_test.is_some() {
5871 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5872 }
5873
5874 let mut out = Vec::with_capacity(1);
5875 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5876 out.push(crate::plan::TableOption::RetainHistory(cw));
5877 }
5878 Ok(out)
5879}
5880
5881pub fn plan_alter_index_options(
5882 scx: &mut StatementContext,
5883 AlterIndexStatement {
5884 index_name,
5885 if_exists,
5886 action,
5887 }: AlterIndexStatement<Aug>,
5888) -> Result<Plan, PlanError> {
5889 let object_type = ObjectType::Index;
5890 match action {
5891 AlterIndexAction::ResetOptions(options) => {
5892 let mut options = options.into_iter();
5893 if let Some(opt) = options.next() {
5894 match opt {
5895 IndexOptionName::RetainHistory => {
5896 if options.next().is_some() {
5897 sql_bail!("RETAIN HISTORY must be only option");
5898 }
5899 return alter_retain_history(
5900 scx,
5901 object_type,
5902 if_exists,
5903 UnresolvedObjectName::Item(index_name),
5904 None,
5905 );
5906 }
5907 }
5908 }
5909 sql_bail!("expected option");
5910 }
5911 AlterIndexAction::SetOptions(options) => {
5912 let mut options = options.into_iter();
5913 if let Some(opt) = options.next() {
5914 match opt.name {
5915 IndexOptionName::RetainHistory => {
5916 if options.next().is_some() {
5917 sql_bail!("RETAIN HISTORY must be only option");
5918 }
5919 return alter_retain_history(
5920 scx,
5921 object_type,
5922 if_exists,
5923 UnresolvedObjectName::Item(index_name),
5924 opt.value,
5925 );
5926 }
5927 }
5928 }
5929 sql_bail!("expected option");
5930 }
5931 }
5932}
5933
5934pub fn describe_alter_cluster_set_options(
5935 _: &StatementContext,
5936 _: AlterClusterStatement<Aug>,
5937) -> Result<StatementDesc, PlanError> {
5938 Ok(StatementDesc::new(None))
5939}
5940
5941pub fn plan_alter_cluster(
5942 scx: &mut StatementContext,
5943 AlterClusterStatement {
5944 name,
5945 action,
5946 if_exists,
5947 }: AlterClusterStatement<Aug>,
5948) -> Result<Plan, PlanError> {
5949 let cluster = match resolve_cluster(scx, &name, if_exists)? {
5950 Some(entry) => entry,
5951 None => {
5952 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5953 name: name.to_ast_string_simple(),
5954 object_type: ObjectType::Cluster,
5955 });
5956
5957 return Ok(Plan::AlterNoop(AlterNoopPlan {
5958 object_type: ObjectType::Cluster,
5959 }));
5960 }
5961 };
5962
5963 let mut options: PlanClusterOption = Default::default();
5964 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5965
5966 match action {
5967 AlterClusterAction::SetOptions {
5968 options: set_options,
5969 with_options,
5970 } => {
5971 let ClusterOptionExtracted {
5972 availability_zones,
5973 introspection_debugging,
5974 introspection_interval,
5975 managed,
5976 replicas: replica_defs,
5977 replication_factor,
5978 seen: _,
5979 size,
5980 disk,
5981 schedule,
5982 workload_class,
5983 }: ClusterOptionExtracted = set_options.try_into()?;
5984
5985 if !scx.catalog.active_role_id().is_system() {
5986 if workload_class.is_some() {
5987 sql_bail!("WORKLOAD CLASS not supported for non-system users");
5988 }
5989 }
5990
5991 match managed.unwrap_or_else(|| cluster.is_managed()) {
5992 true => {
5993 let alter_strategy_extracted =
5994 ClusterAlterOptionExtracted::try_from(with_options)?;
5995 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
5996
5997 match alter_strategy {
5998 AlterClusterPlanStrategy::None => {}
5999 _ => {
6000 scx.require_feature_flag(
6001 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6002 )?;
6003 }
6004 }
6005
6006 if replica_defs.is_some() {
6007 sql_bail!("REPLICAS not supported for managed clusters");
6008 }
6009 if schedule.is_some()
6010 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6011 {
6012 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6013 }
6014
6015 if let Some(replication_factor) = replication_factor {
6016 if schedule.is_some()
6017 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6018 {
6019 sql_bail!(
6020 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6021 );
6022 }
6023 if let Some(current_schedule) = cluster.schedule() {
6024 if !matches!(current_schedule, ClusterSchedule::Manual) {
6025 sql_bail!(
6026 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6027 );
6028 }
6029 }
6030
6031 let internal_replica_count =
6032 cluster.replicas().iter().filter(|r| r.internal()).count();
6033 let hypothetical_replica_count =
6034 internal_replica_count + usize::cast_from(replication_factor);
6035
6036 if contains_single_replica_objects(scx, cluster)
6039 && hypothetical_replica_count > 1
6040 {
6041 return Err(PlanError::CreateReplicaFailStorageObjects {
6042 current_replica_count: cluster.replica_ids().iter().count(),
6043 internal_replica_count,
6044 hypothetical_replica_count,
6045 });
6046 }
6047 } else if alter_strategy.is_some() {
6048 let internal_replica_count =
6052 cluster.replicas().iter().filter(|r| r.internal()).count();
6053 let hypothetical_replica_count = internal_replica_count * 2;
6054 if contains_single_replica_objects(scx, cluster) {
6055 return Err(PlanError::CreateReplicaFailStorageObjects {
6056 current_replica_count: cluster.replica_ids().iter().count(),
6057 internal_replica_count,
6058 hypothetical_replica_count,
6059 });
6060 }
6061 }
6062 }
6063 false => {
6064 if !alter_strategy.is_none() {
6065 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6066 }
6067 if availability_zones.is_some() {
6068 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6069 }
6070 if replication_factor.is_some() {
6071 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6072 }
6073 if introspection_debugging.is_some() {
6074 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6075 }
6076 if introspection_interval.is_some() {
6077 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6078 }
6079 if size.is_some() {
6080 sql_bail!("SIZE not supported for unmanaged clusters");
6081 }
6082 if disk.is_some() {
6083 sql_bail!("DISK not supported for unmanaged clusters");
6084 }
6085 if schedule.is_some()
6086 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6087 {
6088 sql_bail!(
6089 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6090 );
6091 }
6092 if let Some(current_schedule) = cluster.schedule() {
6093 if !matches!(current_schedule, ClusterSchedule::Manual)
6094 && schedule.is_none()
6095 {
6096 sql_bail!(
6097 "when switching a cluster to unmanaged, if the managed \
6098 cluster's SCHEDULE is anything other than MANUAL, you have to \
6099 explicitly set the SCHEDULE to MANUAL"
6100 );
6101 }
6102 }
6103 }
6104 }
6105
6106 let mut replicas = vec![];
6107 for ReplicaDefinition { name, options } in
6108 replica_defs.into_iter().flat_map(Vec::into_iter)
6109 {
6110 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6111 }
6112
6113 if let Some(managed) = managed {
6114 options.managed = AlterOptionParameter::Set(managed);
6115 }
6116 if let Some(replication_factor) = replication_factor {
6117 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6118 }
6119 if let Some(size) = &size {
6120 if scx.catalog.is_cluster_size_cc(size) {
6128 if disk.is_some() {
6129 sql_bail!(
6130 "DISK option not supported for modern cluster sizes because disk is always enabled"
6131 );
6132 } else {
6133 options.disk = AlterOptionParameter::Set(true);
6134 }
6135 }
6136 options.size = AlterOptionParameter::Set(size.clone());
6137 }
6138 if let Some(availability_zones) = availability_zones {
6139 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6140 }
6141 if let Some(introspection_debugging) = introspection_debugging {
6142 options.introspection_debugging =
6143 AlterOptionParameter::Set(introspection_debugging);
6144 }
6145 if let Some(introspection_interval) = introspection_interval {
6146 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6147 }
6148 if let Some(disk) = disk {
6149 let size = size.as_deref().unwrap_or_else(|| {
6157 cluster.managed_size().expect("cluster known to be managed")
6158 });
6159 if scx.catalog.is_cluster_size_cc(size) {
6160 sql_bail!(
6161 "DISK option not supported for modern cluster sizes because disk is always enabled"
6162 );
6163 }
6164
6165 if disk {
6166 scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
6167 }
6168 options.disk = AlterOptionParameter::Set(disk);
6169 }
6170 if !replicas.is_empty() {
6171 options.replicas = AlterOptionParameter::Set(replicas);
6172 }
6173 if let Some(schedule) = schedule {
6174 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6175 }
6176 if let Some(workload_class) = workload_class {
6177 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6178 }
6179 }
6180 AlterClusterAction::ResetOptions(reset_options) => {
6181 use AlterOptionParameter::Reset;
6182 use ClusterOptionName::*;
6183
6184 if !scx.catalog.active_role_id().is_system() {
6185 if reset_options.contains(&WorkloadClass) {
6186 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6187 }
6188 }
6189
6190 for option in reset_options {
6191 match option {
6192 AvailabilityZones => options.availability_zones = Reset,
6193 Disk => options.disk = Reset,
6194 IntrospectionInterval => options.introspection_interval = Reset,
6195 IntrospectionDebugging => options.introspection_debugging = Reset,
6196 Managed => options.managed = Reset,
6197 Replicas => options.replicas = Reset,
6198 ReplicationFactor => options.replication_factor = Reset,
6199 Size => options.size = Reset,
6200 Schedule => options.schedule = Reset,
6201 WorkloadClass => options.workload_class = Reset,
6202 }
6203 }
6204 }
6205 }
6206 Ok(Plan::AlterCluster(AlterClusterPlan {
6207 id: cluster.id(),
6208 name: cluster.name().to_string(),
6209 options,
6210 strategy: alter_strategy,
6211 }))
6212}
6213
6214pub fn describe_alter_set_cluster(
6215 _: &StatementContext,
6216 _: AlterSetClusterStatement<Aug>,
6217) -> Result<StatementDesc, PlanError> {
6218 Ok(StatementDesc::new(None))
6219}
6220
6221pub fn plan_alter_item_set_cluster(
6222 scx: &StatementContext,
6223 AlterSetClusterStatement {
6224 if_exists,
6225 set_cluster: in_cluster_name,
6226 name,
6227 object_type,
6228 }: AlterSetClusterStatement<Aug>,
6229) -> Result<Plan, PlanError> {
6230 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6231
6232 let object_type = object_type.into();
6233
6234 match object_type {
6236 ObjectType::MaterializedView => {}
6237 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6238 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6239 }
6240 _ => {
6241 bail_never_supported!(
6242 format!("ALTER {object_type} SET CLUSTER"),
6243 "sql/alter-set-cluster/",
6244 format!("{object_type} has no associated cluster")
6245 )
6246 }
6247 }
6248
6249 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6250
6251 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6252 Some(entry) => {
6253 let current_cluster = entry.cluster_id();
6254 let Some(current_cluster) = current_cluster else {
6255 sql_bail!("No cluster associated with {name}");
6256 };
6257
6258 if current_cluster == in_cluster.id() {
6259 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6260 } else {
6261 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6262 id: entry.id(),
6263 set_cluster: in_cluster.id(),
6264 }))
6265 }
6266 }
6267 None => {
6268 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6269 name: name.to_ast_string_simple(),
6270 object_type,
6271 });
6272
6273 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6274 }
6275 }
6276}
6277
6278pub fn describe_alter_object_rename(
6279 _: &StatementContext,
6280 _: AlterObjectRenameStatement,
6281) -> Result<StatementDesc, PlanError> {
6282 Ok(StatementDesc::new(None))
6283}
6284
6285pub fn plan_alter_object_rename(
6286 scx: &mut StatementContext,
6287 AlterObjectRenameStatement {
6288 name,
6289 object_type,
6290 to_item_name,
6291 if_exists,
6292 }: AlterObjectRenameStatement,
6293) -> Result<Plan, PlanError> {
6294 let object_type = object_type.into();
6295 match (object_type, name) {
6296 (
6297 ObjectType::View
6298 | ObjectType::MaterializedView
6299 | ObjectType::Table
6300 | ObjectType::Source
6301 | ObjectType::Index
6302 | ObjectType::Sink
6303 | ObjectType::Secret
6304 | ObjectType::Connection,
6305 UnresolvedObjectName::Item(name),
6306 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6307 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6308 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6309 }
6310 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6311 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6312 }
6313 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6314 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6315 }
6316 (object_type, name) => {
6317 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6318 }
6319 }
6320}
6321
6322pub fn plan_alter_schema_rename(
6323 scx: &mut StatementContext,
6324 name: UnresolvedSchemaName,
6325 to_schema_name: Ident,
6326 if_exists: bool,
6327) -> Result<Plan, PlanError> {
6328 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6329 let object_type = ObjectType::Schema;
6330 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6331 name: name.to_ast_string_simple(),
6332 object_type,
6333 });
6334 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6335 };
6336
6337 if scx
6339 .resolve_schema_in_database(&db_spec, &to_schema_name)
6340 .is_ok()
6341 {
6342 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6343 to_schema_name.clone().into_string(),
6344 )));
6345 }
6346
6347 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6349 if schema.id().is_system() {
6350 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6351 }
6352
6353 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6354 cur_schema_spec: (db_spec, schema_spec),
6355 new_schema_name: to_schema_name.into_string(),
6356 }))
6357}
6358
6359pub fn plan_alter_schema_swap<F>(
6360 scx: &mut StatementContext,
6361 name_a: UnresolvedSchemaName,
6362 name_b: Ident,
6363 gen_temp_suffix: F,
6364) -> Result<Plan, PlanError>
6365where
6366 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6367{
6368 let schema_a = scx.resolve_schema(name_a.clone())?;
6369
6370 let db_spec = schema_a.database().clone();
6371 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6372 sql_bail!("cannot swap schemas that are in the ambient database");
6373 };
6374 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6375
6376 if schema_a.id().is_system() || schema_b.id().is_system() {
6378 bail_never_supported!("swapping a system schema".to_string())
6379 }
6380
6381 let check = |temp_suffix: &str| {
6385 let mut temp_name = ident!("mz_schema_swap_");
6386 temp_name.append_lossy(temp_suffix);
6387 scx.resolve_schema_in_database(&db_spec, &temp_name)
6388 .is_err()
6389 };
6390 let temp_suffix = gen_temp_suffix(&check)?;
6391 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6392
6393 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6394 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6395 schema_a_name: schema_a.name().schema.to_string(),
6396 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6397 schema_b_name: schema_b.name().schema.to_string(),
6398 name_temp,
6399 }))
6400}
6401
6402pub fn plan_alter_item_rename(
6403 scx: &mut StatementContext,
6404 object_type: ObjectType,
6405 name: UnresolvedItemName,
6406 to_item_name: Ident,
6407 if_exists: bool,
6408) -> Result<Plan, PlanError> {
6409 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6410 Ok(r) => r,
6411 Err(PlanError::MismatchedObjectType {
6413 name,
6414 is_type: ObjectType::MaterializedView,
6415 expected_type: ObjectType::View,
6416 }) => {
6417 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6418 }
6419 e => e?,
6420 };
6421
6422 match resolved {
6423 Some(entry) => {
6424 let full_name = scx.catalog.resolve_full_name(entry.name());
6425 let item_type = entry.item_type();
6426
6427 let proposed_name = QualifiedItemName {
6428 qualifiers: entry.name().qualifiers.clone(),
6429 item: to_item_name.clone().into_string(),
6430 };
6431
6432 let conflicting_type_exists;
6436 let conflicting_item_exists;
6437 if item_type == CatalogItemType::Type {
6438 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6439 conflicting_item_exists = scx
6440 .catalog
6441 .get_item_by_name(&proposed_name)
6442 .map(|item| item.item_type().conflicts_with_type())
6443 .unwrap_or(false);
6444 } else {
6445 conflicting_type_exists = item_type.conflicts_with_type()
6446 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6447 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6448 };
6449 if conflicting_type_exists || conflicting_item_exists {
6450 sql_bail!("catalog item '{}' already exists", to_item_name);
6451 }
6452
6453 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6454 id: entry.id(),
6455 current_full_name: full_name,
6456 to_name: normalize::ident(to_item_name),
6457 object_type,
6458 }))
6459 }
6460 None => {
6461 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6462 name: name.to_ast_string_simple(),
6463 object_type,
6464 });
6465
6466 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6467 }
6468 }
6469}
6470
6471pub fn plan_alter_cluster_rename(
6472 scx: &mut StatementContext,
6473 object_type: ObjectType,
6474 name: Ident,
6475 to_name: Ident,
6476 if_exists: bool,
6477) -> Result<Plan, PlanError> {
6478 match resolve_cluster(scx, &name, if_exists)? {
6479 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6480 id: entry.id(),
6481 name: entry.name().to_string(),
6482 to_name: ident(to_name),
6483 })),
6484 None => {
6485 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6486 name: name.to_ast_string_simple(),
6487 object_type,
6488 });
6489
6490 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6491 }
6492 }
6493}
6494
6495pub fn plan_alter_cluster_swap<F>(
6496 scx: &mut StatementContext,
6497 name_a: Ident,
6498 name_b: Ident,
6499 gen_temp_suffix: F,
6500) -> Result<Plan, PlanError>
6501where
6502 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6503{
6504 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6505 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6506
6507 let check = |temp_suffix: &str| {
6508 let mut temp_name = ident!("mz_schema_swap_");
6509 temp_name.append_lossy(temp_suffix);
6510 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6511 Err(CatalogError::UnknownCluster(_)) => true,
6513 Ok(_) | Err(_) => false,
6515 }
6516 };
6517 let temp_suffix = gen_temp_suffix(&check)?;
6518 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6519
6520 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6521 id_a: cluster_a.id(),
6522 id_b: cluster_b.id(),
6523 name_a: name_a.into_string(),
6524 name_b: name_b.into_string(),
6525 name_temp,
6526 }))
6527}
6528
6529pub fn plan_alter_cluster_replica_rename(
6530 scx: &mut StatementContext,
6531 object_type: ObjectType,
6532 name: QualifiedReplica,
6533 to_item_name: Ident,
6534 if_exists: bool,
6535) -> Result<Plan, PlanError> {
6536 match resolve_cluster_replica(scx, &name, if_exists)? {
6537 Some((cluster, replica)) => {
6538 ensure_cluster_is_not_managed(scx, cluster.id())?;
6539 Ok(Plan::AlterClusterReplicaRename(
6540 AlterClusterReplicaRenamePlan {
6541 cluster_id: cluster.id(),
6542 replica_id: replica,
6543 name: QualifiedReplica {
6544 cluster: Ident::new(cluster.name())?,
6545 replica: name.replica,
6546 },
6547 to_name: normalize::ident(to_item_name),
6548 },
6549 ))
6550 }
6551 None => {
6552 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6553 name: name.to_ast_string_simple(),
6554 object_type,
6555 });
6556
6557 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6558 }
6559 }
6560}
6561
6562pub fn describe_alter_object_swap(
6563 _: &StatementContext,
6564 _: AlterObjectSwapStatement,
6565) -> Result<StatementDesc, PlanError> {
6566 Ok(StatementDesc::new(None))
6567}
6568
6569pub fn plan_alter_object_swap(
6570 scx: &mut StatementContext,
6571 stmt: AlterObjectSwapStatement,
6572) -> Result<Plan, PlanError> {
6573 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6574
6575 let AlterObjectSwapStatement {
6576 object_type,
6577 name_a,
6578 name_b,
6579 } = stmt;
6580 let object_type = object_type.into();
6581
6582 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6584 let mut attempts = 0;
6585 let name_temp = loop {
6586 attempts += 1;
6587 if attempts > 10 {
6588 tracing::warn!("Unable to generate temp id for swapping");
6589 sql_bail!("unable to swap!");
6590 }
6591
6592 let short_id = mz_ore::id_gen::temp_id();
6594 if check_fn(&short_id) {
6595 break short_id;
6596 }
6597 };
6598
6599 Ok(name_temp)
6600 };
6601
6602 match (object_type, name_a, name_b) {
6603 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6604 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6605 }
6606 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6607 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6608 }
6609 (object_type, _, _) => Err(PlanError::Unsupported {
6610 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6611 discussion_no: None,
6612 }),
6613 }
6614}
6615
6616pub fn describe_alter_retain_history(
6617 _: &StatementContext,
6618 _: AlterRetainHistoryStatement<Aug>,
6619) -> Result<StatementDesc, PlanError> {
6620 Ok(StatementDesc::new(None))
6621}
6622
6623pub fn plan_alter_retain_history(
6624 scx: &StatementContext,
6625 AlterRetainHistoryStatement {
6626 object_type,
6627 if_exists,
6628 name,
6629 history,
6630 }: AlterRetainHistoryStatement<Aug>,
6631) -> Result<Plan, PlanError> {
6632 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6633}
6634
6635fn alter_retain_history(
6636 scx: &StatementContext,
6637 object_type: ObjectType,
6638 if_exists: bool,
6639 name: UnresolvedObjectName,
6640 history: Option<WithOptionValue<Aug>>,
6641) -> Result<Plan, PlanError> {
6642 let name = match (object_type, name) {
6643 (
6644 ObjectType::View
6646 | ObjectType::MaterializedView
6647 | ObjectType::Table
6648 | ObjectType::Source
6649 | ObjectType::Index,
6650 UnresolvedObjectName::Item(name),
6651 ) => name,
6652 (object_type, _) => {
6653 sql_bail!("{object_type} does not support RETAIN HISTORY")
6654 }
6655 };
6656 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6657 Some(entry) => {
6658 let full_name = scx.catalog.resolve_full_name(entry.name());
6659 let item_type = entry.item_type();
6660
6661 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6663 return Err(PlanError::AlterViewOnMaterializedView(
6664 full_name.to_string(),
6665 ));
6666 } else if object_type == ObjectType::View {
6667 sql_bail!("{object_type} does not support RETAIN HISTORY")
6668 } else if object_type != item_type {
6669 sql_bail!(
6670 "\"{}\" is a {} not a {}",
6671 full_name,
6672 entry.item_type(),
6673 format!("{object_type}").to_lowercase()
6674 )
6675 }
6676
6677 let (value, lcw) = match &history {
6679 Some(WithOptionValue::RetainHistoryFor(value)) => {
6680 let window = OptionalDuration::try_from_value(value.clone())?;
6681 (Some(value.clone()), window.0)
6682 }
6683 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6685 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6686 };
6687 let window = plan_retain_history(scx, lcw)?;
6688
6689 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6690 id: entry.id(),
6691 value,
6692 window,
6693 object_type,
6694 }))
6695 }
6696 None => {
6697 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6698 name: name.to_ast_string_simple(),
6699 object_type,
6700 });
6701
6702 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6703 }
6704 }
6705}
6706
6707pub fn describe_alter_secret_options(
6708 _: &StatementContext,
6709 _: AlterSecretStatement<Aug>,
6710) -> Result<StatementDesc, PlanError> {
6711 Ok(StatementDesc::new(None))
6712}
6713
6714pub fn plan_alter_secret(
6715 scx: &mut StatementContext,
6716 stmt: AlterSecretStatement<Aug>,
6717) -> Result<Plan, PlanError> {
6718 let AlterSecretStatement {
6719 name,
6720 if_exists,
6721 value,
6722 } = stmt;
6723 let object_type = ObjectType::Secret;
6724 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6725 Some(entry) => entry.id(),
6726 None => {
6727 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6728 name: name.to_string(),
6729 object_type,
6730 });
6731
6732 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6733 }
6734 };
6735
6736 let secret_as = query::plan_secret_as(scx, value)?;
6737
6738 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6739}
6740
6741pub fn describe_alter_connection(
6742 _: &StatementContext,
6743 _: AlterConnectionStatement<Aug>,
6744) -> Result<StatementDesc, PlanError> {
6745 Ok(StatementDesc::new(None))
6746}
6747
6748generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6749
6750pub fn plan_alter_connection(
6751 scx: &StatementContext,
6752 stmt: AlterConnectionStatement<Aug>,
6753) -> Result<Plan, PlanError> {
6754 let AlterConnectionStatement {
6755 name,
6756 if_exists,
6757 actions,
6758 with_options,
6759 } = stmt;
6760 let conn_name = normalize::unresolved_item_name(name)?;
6761 let entry = match scx.catalog.resolve_item(&conn_name) {
6762 Ok(entry) => entry,
6763 Err(_) if if_exists => {
6764 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6765 name: conn_name.to_string(),
6766 object_type: ObjectType::Sink,
6767 });
6768
6769 return Ok(Plan::AlterNoop(AlterNoopPlan {
6770 object_type: ObjectType::Connection,
6771 }));
6772 }
6773 Err(e) => return Err(e.into()),
6774 };
6775
6776 let connection = entry.connection()?;
6777
6778 if actions
6779 .iter()
6780 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6781 {
6782 if actions.len() > 1 {
6783 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6784 }
6785
6786 if !with_options.is_empty() {
6787 sql_bail!(
6788 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6789 with_options
6790 .iter()
6791 .map(|o| o.to_ast_string_simple())
6792 .join(", ")
6793 );
6794 }
6795
6796 if !matches!(connection, Connection::Ssh(_)) {
6797 sql_bail!(
6798 "{} is not an SSH connection",
6799 scx.catalog.resolve_full_name(entry.name())
6800 )
6801 }
6802
6803 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6804 id: entry.id(),
6805 action: crate::plan::AlterConnectionAction::RotateKeys,
6806 }));
6807 }
6808
6809 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6810 if options.validate.is_some() {
6811 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6812 }
6813
6814 let validate = match options.validate {
6815 Some(val) => val,
6816 None => {
6817 scx.catalog
6818 .system_vars()
6819 .enable_default_connection_validation()
6820 && connection.validate_by_default()
6821 }
6822 };
6823
6824 let connection_type = match connection {
6825 Connection::Aws(_) => CreateConnectionType::Aws,
6826 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6827 Connection::Kafka(_) => CreateConnectionType::Kafka,
6828 Connection::Csr(_) => CreateConnectionType::Csr,
6829 Connection::Postgres(_) => CreateConnectionType::Postgres,
6830 Connection::Ssh(_) => CreateConnectionType::Ssh,
6831 Connection::MySql(_) => CreateConnectionType::MySql,
6832 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6833 };
6834
6835 let specified_options: BTreeSet<_> = actions
6837 .iter()
6838 .map(|action: &AlterConnectionAction<Aug>| match action {
6839 AlterConnectionAction::SetOption(option) => option.name.clone(),
6840 AlterConnectionAction::DropOption(name) => name.clone(),
6841 AlterConnectionAction::RotateKeys => unreachable!(),
6842 })
6843 .collect();
6844
6845 for invalid in INALTERABLE_OPTIONS {
6846 if specified_options.contains(invalid) {
6847 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6848 }
6849 }
6850
6851 connection::validate_options_per_connection_type(connection_type, specified_options)?;
6852
6853 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6855 actions.into_iter().partition_map(|action| match action {
6856 AlterConnectionAction::SetOption(option) => Either::Left(option),
6857 AlterConnectionAction::DropOption(name) => Either::Right(name),
6858 AlterConnectionAction::RotateKeys => unreachable!(),
6859 });
6860
6861 let set_options: BTreeMap<_, _> = set_options_vec
6862 .clone()
6863 .into_iter()
6864 .map(|option| (option.name, option.value))
6865 .collect();
6866
6867 let connection_options_extracted =
6871 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6872
6873 let duplicates: Vec<_> = connection_options_extracted
6874 .seen
6875 .intersection(&drop_options)
6876 .collect();
6877
6878 if !duplicates.is_empty() {
6879 sql_bail!(
6880 "cannot both SET and DROP/RESET options {}",
6881 duplicates
6882 .iter()
6883 .map(|option| option.to_string())
6884 .join(", ")
6885 )
6886 }
6887
6888 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6889 let set_options_count = mutually_exclusive_options
6890 .iter()
6891 .filter(|o| set_options.contains_key(o))
6892 .count();
6893 let drop_options_count = mutually_exclusive_options
6894 .iter()
6895 .filter(|o| drop_options.contains(o))
6896 .count();
6897
6898 if set_options_count > 0 && drop_options_count > 0 {
6900 sql_bail!(
6901 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6902 connection_type,
6903 mutually_exclusive_options
6904 .iter()
6905 .map(|option| option.to_string())
6906 .join(", ")
6907 )
6908 }
6909
6910 if set_options_count > 0 || drop_options_count > 0 {
6915 drop_options.extend(mutually_exclusive_options.iter().cloned());
6916 }
6917
6918 }
6921
6922 Ok(Plan::AlterConnection(AlterConnectionPlan {
6923 id: entry.id(),
6924 action: crate::plan::AlterConnectionAction::AlterOptions {
6925 set_options,
6926 drop_options,
6927 validate,
6928 },
6929 }))
6930}
6931
6932pub fn describe_alter_sink(
6933 _: &StatementContext,
6934 _: AlterSinkStatement<Aug>,
6935) -> Result<StatementDesc, PlanError> {
6936 Ok(StatementDesc::new(None))
6937}
6938
6939pub fn plan_alter_sink(
6940 scx: &mut StatementContext,
6941 stmt: AlterSinkStatement<Aug>,
6942) -> Result<Plan, PlanError> {
6943 let AlterSinkStatement {
6944 sink_name,
6945 if_exists,
6946 action,
6947 } = stmt;
6948
6949 let object_type = ObjectType::Sink;
6950 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6951
6952 let Some(item) = item else {
6953 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6954 name: sink_name.to_string(),
6955 object_type,
6956 });
6957
6958 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6959 };
6960 let item = item.at_version(RelationVersionSelector::Latest);
6962
6963 match action {
6964 AlterSinkAction::ChangeRelation(new_from) => {
6965 let create_sql = item.create_sql();
6967 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6968 let [stmt]: [StatementParseResult; 1] = stmts
6969 .try_into()
6970 .expect("create sql of sink was not exactly one statement");
6971 let Statement::CreateSink(mut stmt) = stmt.ast else {
6972 unreachable!("invalid create SQL for sink item");
6973 };
6974
6975 let cur_version = stmt
6977 .with_options
6978 .drain_filter_swapping(|o| o.name == CreateSinkOptionName::Version)
6979 .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
6980 .max()
6981 .unwrap_or(0);
6982 let new_version = cur_version + 1;
6983 stmt.with_options.push(CreateSinkOption {
6984 name: CreateSinkOptionName::Version,
6985 value: Some(WithOptionValue::Value(Value::Number(
6986 new_version.to_string(),
6987 ))),
6988 });
6989
6990 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
6992 stmt.from = new_from;
6993
6994 let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
6996 unreachable!("invalid plan for CREATE SINK statement");
6997 };
6998
6999 Ok(Plan::AlterSink(AlterSinkPlan {
7000 item_id: item.id(),
7001 global_id: item.global_id(),
7002 sink: plan.sink,
7003 with_snapshot: plan.with_snapshot,
7004 in_cluster: plan.in_cluster,
7005 }))
7006 }
7007 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7008 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7009 }
7010}
7011
7012pub fn describe_alter_source(
7013 _: &StatementContext,
7014 _: AlterSourceStatement<Aug>,
7015) -> Result<StatementDesc, PlanError> {
7016 Ok(StatementDesc::new(None))
7018}
7019
7020generate_extracted_config!(
7021 AlterSourceAddSubsourceOption,
7022 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7023 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7024 (Details, String)
7025);
7026
7027pub fn plan_alter_source(
7028 scx: &mut StatementContext,
7029 stmt: AlterSourceStatement<Aug>,
7030) -> Result<Plan, PlanError> {
7031 let AlterSourceStatement {
7032 source_name,
7033 if_exists,
7034 action,
7035 } = stmt;
7036 let object_type = ObjectType::Source;
7037
7038 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7039 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7040 name: source_name.to_string(),
7041 object_type,
7042 });
7043
7044 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7045 }
7046
7047 match action {
7048 AlterSourceAction::SetOptions(options) => {
7049 let mut options = options.into_iter();
7050 let option = options.next().unwrap();
7051 if option.name == CreateSourceOptionName::RetainHistory {
7052 if options.next().is_some() {
7053 sql_bail!("RETAIN HISTORY must be only option");
7054 }
7055 return alter_retain_history(
7056 scx,
7057 object_type,
7058 if_exists,
7059 UnresolvedObjectName::Item(source_name),
7060 option.value,
7061 );
7062 }
7063 sql_bail!(
7066 "Cannot modify the {} of a SOURCE.",
7067 option.name.to_ast_string_simple()
7068 );
7069 }
7070 AlterSourceAction::ResetOptions(reset) => {
7071 let mut options = reset.into_iter();
7072 let option = options.next().unwrap();
7073 if option == CreateSourceOptionName::RetainHistory {
7074 if options.next().is_some() {
7075 sql_bail!("RETAIN HISTORY must be only option");
7076 }
7077 return alter_retain_history(
7078 scx,
7079 object_type,
7080 if_exists,
7081 UnresolvedObjectName::Item(source_name),
7082 None,
7083 );
7084 }
7085 sql_bail!(
7086 "Cannot modify the {} of a SOURCE.",
7087 option.to_ast_string_simple()
7088 );
7089 }
7090 AlterSourceAction::DropSubsources { .. } => {
7091 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7092 }
7093 AlterSourceAction::AddSubsources { .. } => {
7094 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7095 }
7096 AlterSourceAction::RefreshReferences => {
7097 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7098 }
7099 };
7100}
7101
7102pub fn describe_alter_system_set(
7103 _: &StatementContext,
7104 _: AlterSystemSetStatement,
7105) -> Result<StatementDesc, PlanError> {
7106 Ok(StatementDesc::new(None))
7107}
7108
7109pub fn plan_alter_system_set(
7110 _: &StatementContext,
7111 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7112) -> Result<Plan, PlanError> {
7113 let name = name.to_string();
7114 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7115 name,
7116 value: scl::plan_set_variable_to(to)?,
7117 }))
7118}
7119
7120pub fn describe_alter_system_reset(
7121 _: &StatementContext,
7122 _: AlterSystemResetStatement,
7123) -> Result<StatementDesc, PlanError> {
7124 Ok(StatementDesc::new(None))
7125}
7126
7127pub fn plan_alter_system_reset(
7128 _: &StatementContext,
7129 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7130) -> Result<Plan, PlanError> {
7131 let name = name.to_string();
7132 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7133}
7134
7135pub fn describe_alter_system_reset_all(
7136 _: &StatementContext,
7137 _: AlterSystemResetAllStatement,
7138) -> Result<StatementDesc, PlanError> {
7139 Ok(StatementDesc::new(None))
7140}
7141
7142pub fn plan_alter_system_reset_all(
7143 _: &StatementContext,
7144 _: AlterSystemResetAllStatement,
7145) -> Result<Plan, PlanError> {
7146 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7147}
7148
7149pub fn describe_alter_role(
7150 _: &StatementContext,
7151 _: AlterRoleStatement<Aug>,
7152) -> Result<StatementDesc, PlanError> {
7153 Ok(StatementDesc::new(None))
7154}
7155
7156pub fn plan_alter_role(
7157 _scx: &StatementContext,
7158 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7159) -> Result<Plan, PlanError> {
7160 let option = match option {
7161 AlterRoleOption::Attributes(attrs) => {
7162 let attrs = plan_role_attributes(attrs)?;
7163 PlannedAlterRoleOption::Attributes(attrs)
7164 }
7165 AlterRoleOption::Variable(variable) => {
7166 let var = plan_role_variable(variable)?;
7167 PlannedAlterRoleOption::Variable(var)
7168 }
7169 };
7170
7171 Ok(Plan::AlterRole(AlterRolePlan {
7172 id: name.id,
7173 name: name.name,
7174 option,
7175 }))
7176}
7177
7178pub fn describe_alter_table_add_column(
7179 _: &StatementContext,
7180 _: AlterTableAddColumnStatement<Aug>,
7181) -> Result<StatementDesc, PlanError> {
7182 Ok(StatementDesc::new(None))
7183}
7184
7185pub fn plan_alter_table_add_column(
7186 scx: &StatementContext,
7187 stmt: AlterTableAddColumnStatement<Aug>,
7188) -> Result<Plan, PlanError> {
7189 let AlterTableAddColumnStatement {
7190 if_exists,
7191 name,
7192 if_col_not_exist,
7193 column_name,
7194 data_type,
7195 } = stmt;
7196 let object_type = ObjectType::Table;
7197
7198 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7199
7200 let (relation_id, item_name, desc) =
7201 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7202 Some(item) => {
7203 let item_name = scx.catalog.resolve_full_name(item.name());
7205 let item = item.at_version(RelationVersionSelector::Latest);
7206 let desc = item.desc(&item_name)?.into_owned();
7207 (item.id(), item_name, desc)
7208 }
7209 None => {
7210 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7211 name: name.to_ast_string_simple(),
7212 object_type,
7213 });
7214 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7215 }
7216 };
7217
7218 let column_name = ColumnName::from(column_name.as_str());
7219 if desc.get_by_name(&column_name).is_some() {
7220 if if_col_not_exist {
7221 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7222 column_name: column_name.to_string(),
7223 object_name: item_name.item,
7224 });
7225 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7226 } else {
7227 return Err(PlanError::ColumnAlreadyExists {
7228 column_name,
7229 object_name: item_name.item,
7230 });
7231 }
7232 }
7233
7234 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7235 let column_type = scalar_type.nullable(true);
7237 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7239
7240 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7241 relation_id,
7242 column_name,
7243 column_type,
7244 raw_sql_type,
7245 }))
7246}
7247
7248pub fn describe_comment(
7249 _: &StatementContext,
7250 _: CommentStatement<Aug>,
7251) -> Result<StatementDesc, PlanError> {
7252 Ok(StatementDesc::new(None))
7253}
7254
7255pub fn plan_comment(
7256 scx: &mut StatementContext,
7257 stmt: CommentStatement<Aug>,
7258) -> Result<Plan, PlanError> {
7259 const MAX_COMMENT_LENGTH: usize = 1024;
7260
7261 let CommentStatement { object, comment } = stmt;
7262
7263 if let Some(c) = &comment {
7265 if c.len() > 1024 {
7266 return Err(PlanError::CommentTooLong {
7267 length: c.len(),
7268 max_size: MAX_COMMENT_LENGTH,
7269 });
7270 }
7271 }
7272
7273 let (object_id, column_pos) = match &object {
7274 com_ty @ CommentObjectType::Table { name }
7275 | com_ty @ CommentObjectType::View { name }
7276 | com_ty @ CommentObjectType::MaterializedView { name }
7277 | com_ty @ CommentObjectType::Index { name }
7278 | com_ty @ CommentObjectType::Func { name }
7279 | com_ty @ CommentObjectType::Connection { name }
7280 | com_ty @ CommentObjectType::Source { name }
7281 | com_ty @ CommentObjectType::Sink { name }
7282 | com_ty @ CommentObjectType::Secret { name }
7283 | com_ty @ CommentObjectType::ContinualTask { name } => {
7284 let item = scx.get_item_by_resolved_name(name)?;
7285 match (com_ty, item.item_type()) {
7286 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7287 (CommentObjectId::Table(item.id()), None)
7288 }
7289 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7290 (CommentObjectId::View(item.id()), None)
7291 }
7292 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7293 (CommentObjectId::MaterializedView(item.id()), None)
7294 }
7295 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7296 (CommentObjectId::Index(item.id()), None)
7297 }
7298 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7299 (CommentObjectId::Func(item.id()), None)
7300 }
7301 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7302 (CommentObjectId::Connection(item.id()), None)
7303 }
7304 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7305 (CommentObjectId::Source(item.id()), None)
7306 }
7307 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7308 (CommentObjectId::Sink(item.id()), None)
7309 }
7310 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7311 (CommentObjectId::Secret(item.id()), None)
7312 }
7313 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7314 (CommentObjectId::ContinualTask(item.id()), None)
7315 }
7316 (com_ty, cat_ty) => {
7317 let expected_type = match com_ty {
7318 CommentObjectType::Table { .. } => ObjectType::Table,
7319 CommentObjectType::View { .. } => ObjectType::View,
7320 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7321 CommentObjectType::Index { .. } => ObjectType::Index,
7322 CommentObjectType::Func { .. } => ObjectType::Func,
7323 CommentObjectType::Connection { .. } => ObjectType::Connection,
7324 CommentObjectType::Source { .. } => ObjectType::Source,
7325 CommentObjectType::Sink { .. } => ObjectType::Sink,
7326 CommentObjectType::Secret { .. } => ObjectType::Secret,
7327 _ => unreachable!("these are the only types we match on"),
7328 };
7329
7330 return Err(PlanError::InvalidObjectType {
7331 expected_type: SystemObjectType::Object(expected_type),
7332 actual_type: SystemObjectType::Object(cat_ty.into()),
7333 object_name: item.name().item.clone(),
7334 });
7335 }
7336 }
7337 }
7338 CommentObjectType::Type { ty } => match ty {
7339 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7340 sql_bail!("cannot comment on anonymous list or map type");
7341 }
7342 ResolvedDataType::Named { id, modifiers, .. } => {
7343 if !modifiers.is_empty() {
7344 sql_bail!("cannot comment on type with modifiers");
7345 }
7346 (CommentObjectId::Type(*id), None)
7347 }
7348 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7349 },
7350 CommentObjectType::Column { name } => {
7351 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7352 match item.item_type() {
7353 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7354 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7355 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7356 CatalogItemType::MaterializedView => {
7357 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7358 }
7359 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7360 r => {
7361 return Err(PlanError::Unsupported {
7362 feature: format!("Specifying comments on a column of {r}"),
7363 discussion_no: None,
7364 });
7365 }
7366 }
7367 }
7368 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7369 CommentObjectType::Database { name } => {
7370 (CommentObjectId::Database(*name.database_id()), None)
7371 }
7372 CommentObjectType::Schema { name } => (
7373 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7374 None,
7375 ),
7376 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7377 CommentObjectType::ClusterReplica { name } => {
7378 let replica = scx.catalog.resolve_cluster_replica(name)?;
7379 (
7380 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7381 None,
7382 )
7383 }
7384 CommentObjectType::NetworkPolicy { name } => {
7385 (CommentObjectId::NetworkPolicy(name.id), None)
7386 }
7387 };
7388
7389 if let Some(p) = column_pos {
7395 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7396 max_num_columns: MAX_NUM_COLUMNS,
7397 req_num_columns: p,
7398 })?;
7399 }
7400
7401 Ok(Plan::Comment(CommentPlan {
7402 object_id,
7403 sub_component: column_pos,
7404 comment,
7405 }))
7406}
7407
7408pub(crate) fn resolve_cluster<'a>(
7409 scx: &'a StatementContext,
7410 name: &'a Ident,
7411 if_exists: bool,
7412) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7413 match scx.resolve_cluster(Some(name)) {
7414 Ok(cluster) => Ok(Some(cluster)),
7415 Err(_) if if_exists => Ok(None),
7416 Err(e) => Err(e),
7417 }
7418}
7419
7420pub(crate) fn resolve_cluster_replica<'a>(
7421 scx: &'a StatementContext,
7422 name: &QualifiedReplica,
7423 if_exists: bool,
7424) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7425 match scx.resolve_cluster(Some(&name.cluster)) {
7426 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7427 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7428 None if if_exists => Ok(None),
7429 None => Err(sql_err!(
7430 "CLUSTER {} has no CLUSTER REPLICA named {}",
7431 cluster.name(),
7432 name.replica.as_str().quoted(),
7433 )),
7434 },
7435 Err(_) if if_exists => Ok(None),
7436 Err(e) => Err(e),
7437 }
7438}
7439
7440pub(crate) fn resolve_database<'a>(
7441 scx: &'a StatementContext,
7442 name: &'a UnresolvedDatabaseName,
7443 if_exists: bool,
7444) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7445 match scx.resolve_database(name) {
7446 Ok(database) => Ok(Some(database)),
7447 Err(_) if if_exists => Ok(None),
7448 Err(e) => Err(e),
7449 }
7450}
7451
7452pub(crate) fn resolve_schema<'a>(
7453 scx: &'a StatementContext,
7454 name: UnresolvedSchemaName,
7455 if_exists: bool,
7456) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7457 match scx.resolve_schema(name) {
7458 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7459 Err(_) if if_exists => Ok(None),
7460 Err(e) => Err(e),
7461 }
7462}
7463
7464pub(crate) fn resolve_network_policy<'a>(
7465 scx: &'a StatementContext,
7466 name: Ident,
7467 if_exists: bool,
7468) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7469 match scx.catalog.resolve_network_policy(&name.to_string()) {
7470 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7471 id: policy.id(),
7472 name: policy.name().to_string(),
7473 })),
7474 Err(_) if if_exists => Ok(None),
7475 Err(e) => Err(e.into()),
7476 }
7477}
7478
7479pub(crate) fn resolve_item_or_type<'a>(
7480 scx: &'a StatementContext,
7481 object_type: ObjectType,
7482 name: UnresolvedItemName,
7483 if_exists: bool,
7484) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7485 let name = normalize::unresolved_item_name(name)?;
7486 let catalog_item = match object_type {
7487 ObjectType::Type => scx.catalog.resolve_type(&name),
7488 _ => scx.catalog.resolve_item(&name),
7489 };
7490
7491 match catalog_item {
7492 Ok(item) => {
7493 let is_type = ObjectType::from(item.item_type());
7494 if object_type == is_type {
7495 Ok(Some(item))
7496 } else {
7497 Err(PlanError::MismatchedObjectType {
7498 name: scx.catalog.minimal_qualification(item.name()),
7499 is_type,
7500 expected_type: object_type,
7501 })
7502 }
7503 }
7504 Err(_) if if_exists => Ok(None),
7505 Err(e) => Err(e.into()),
7506 }
7507}
7508
7509fn ensure_cluster_is_not_managed(
7511 scx: &StatementContext,
7512 cluster_id: ClusterId,
7513) -> Result<(), PlanError> {
7514 let cluster = scx.catalog.get_cluster(cluster_id);
7515 if cluster.is_managed() {
7516 Err(PlanError::ManagedCluster {
7517 cluster_name: cluster.name().to_string(),
7518 })
7519 } else {
7520 Ok(())
7521 }
7522}